新彩天欢迎您!
幻海优品

PySpark - RDD

现在我们已经在我们的系统上安装并配置了PySpark,我们可以在Apache Spark上用Python编程.然而,在这样做之前,让我们理解Spark中的一个基本概念 -  RDD.

RDD代表弹性分布式数据集,这些是运行和运行的元素多个节点在集群上进行并行处理. RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改. RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复.您可以对这些RDD应用多个操作来完成某项任务.

要对这些RDD应用操作,有两种方法和减号;

  • 转型和

  • 行动

让我们了解这两种方式详细.

转型 : 这些是操作,它们应用于RDD以创建新的RDD. Filter,groupBy和map是转换的例子.

动作 : 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序.

要在PySpark中应用任何操作,我们需要创建 PySpark RDD 首先.以下代码块具有PySpark RDD Class : 的详细信息;

class pyspark.RDD (   jrdd,    ctx,    jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()))

让我们看看如何使用PySpark运行一些基本操作. Python文件中的以下代码创建RDD单词,其中存储了一组提到的单词.

words = sc.parallelize (   ["scala",    "java",    "hadoop",    "spark",    "akka",   "spark vs hadoop",    "pyspark",   "pyspark and spark"])

我们现在将对单词进行一些操作.

count()

返回RDD中的元素数.

----------------------------------------count.py---------------------------------------from pyspark import SparkContextsc = SparkContext("local", "count app")words = sc.parallelize (   ["scala",    "java",    "hadoop",    "spark",    "akka",   "spark vs hadoop",    "pyspark",   "pyspark and spark"])counts = words.count()print "Number of elements in RDD -> %i" % (counts)----------------------------------------count.py---------------------------------------

命令 : 号; count()的命令是 :

  $ SPARK_HOME/bin/spark-submit count.py

输出 : 上述命令的输出是 :

  RDD&rarr中的元素数量; 8

collect()

返回RDD中的所有元素.

  ------------------------------------- --- collect.py ---------------------------------------  from pyspark import SparkContextsc = SparkContext("local", "Collect app")words = sc.parallelize (   ["scala",    "java",    "hadoop",    "spark",    "akka",   "spark vs hadoop",    "pyspark",   "pyspark and spark"])coll = words.collect()print "Elements in RDD -> %s" % (coll) ------------ ---------------------------- collect.py ------------------- --------------------

命令 :  collect()的命令是 :

  $ SPARK_HOME/bin/spark-submit collect.py

输出 : 上述命令的输出是 :

Elements in RDD -> [   'scala',    'java',    'hadoop',    'spark',    'akka',    'spark vs hadoop',    'pyspark',    'pyspark and spark']

foreach(f)

仅返回满足foreach内函数条件的元素.在下面的例子中,我们在foreach中调用print函数,它打印RDD中的所有元素.

------- --------------------------------- foreach.py --------------来自pyspark导入的------------------------- from pyspark import SparkContextsc = SparkContext("local", "ForEach app")words = sc.parallelize (   ["scala",    "java",    "hadoop",    "spark",    "akka",   "spark vs hadoop",    "pyspark",   "pyspark and spark"])def f(x): print(x)fore = words.foreach(f) ------------------------------------ ---- foreach.py ---------------------------------------

命令 :  foreach(f)的命令是 :

  $ SPARK_HOME/bin/spark-submit foreach.py

输出 : 上述命令的输出是 :

  scala  java  hadoop  spark  akka  spark vs hadoop  pyspark  pyspark和spark

filter(f)

返回一个包含元素的新RDD,它满足过滤器内部的功能.在下面的示例中,我们过滤掉包含''spark'的字符串.

  ------------- --------------------------- filter.py --------------------来自pyspark导入的-------------------  from pyspark import SparkContextsc = SparkContext("local", "Filter app")words = sc.parallelize (   ["scala",    "java",    "hadoop",    "spark",    "akka",   "spark vs hadoop",    "pyspark",   "pyspark and spark"])words_filter = words.filter(lambda x: 'spark' in x)filtered = words_filter.collect()print "Fitered RDD -> %s" % (filtered) --------------------------------------- -filter.py ----------------------------------------

命令 : 过滤器(f)的命令是 :

  $ SPARK_HOME/bin/spark-submit filter.py

输出 : 上述命令的输出为 : 去;

Fitered RDD -> [   'spark',    'spark vs hadoop',    'pyspark',    'pyspark and spark']

map(f,preservesPartitioning = False)

通过将函数应用于RDD中的每个元素来返回一个新的RDD.在下面的示例中,我们形成一个键值对,并将每个字符串映射为值.

---------------------------------------- map .py --------------------------------------- from pyspark import SparkContextsc = SparkContext("local", "Map app")words = sc.parallelize (   ["scala",    "java",    "hadoop",    "spark",    "akka",   "spark vs hadoop",    "pyspark",   "pyspark and spark"])words_map = words.map(lambda x: (x, 1))mapping = words_map.collect()print "Key value pair -> %s" % (mapping)--------------------------------------- -map.py ---------------------------------------

命令 :  map命令(f,preservesPartitioning = False)是 :

  $ SPARK_HOME/bin/spark-submit map.py

输出 : 以上输出命令是 :

Key value pair -> [   ('scala', 1),    ('java', 1),    ('hadoop', 1),    ('spark', 1),    ('akka', 1),    ('spark vs hadoop', 1),    ('pyspark', 1),    ('pyspark and spark', 1)]

reduce(f)

执行指定的可交换和关联二进制运算后,返回RDD中的元素.在下面的示例中,我们从运算符导入add package并将其应用于'num'以执行简单的加法操作.

----------------------------------- ----- reduce.py --------------------------------------- from pyspark import SparkContextfrom operator import addsc = SparkContext("local", "Reduce app")nums = sc.parallelize([1, 2, 3, 4, 5])adding = nums.reduce(add)print "Adding all the elements -> %i" % (adding)--------------------------------------- -reduce.py ---------------------------------------

命令 :  reduce(f)的命令是 :

  $ SPARK_HOME/bin/spark-submit reduce.py

输出 : 上述命令的输出为 :

Adding all the elements -> 15

加入(其他,numPartitions =无)

它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值.在下面的示例中,两个元素中有两对不同的RDD.加入这两个RDD后,我们得到一个RDD,其元素具有匹配的键及其值.

----------- ----------------------------- join.py ------------------来自pyspark导入的--------------------- from pyspark import SparkContextsc = SparkContext("local", "Join app")x = sc.parallelize([("spark", 1), ("hadoop", 4)])y = sc.parallelize([("spark", 2), ("hadoop", 5)])joined = x.join(y)final = joined.collect()print "Join RDD -> %s" % (final)--------------------------------------- -join.py ---------------------------------------

命令 : 连接命令(其他,numPartitions =无)是 :

$ SPARK_HOME/bin/spark-submit join.py

输出 : 以上输出命令是 :

Join RDD -> [   ('spark', (1, 2)),     ('hadoop', (4, 5))]

cache()

持续这个RDD具有默认存储级别(MEMORY_ONLY).您还可以检查RDD是否被缓存.

---------- ------------------------------ cache.py ----------------- ---------------------- from pyspark import SparkContext sc = SparkContext("local", "Cache app") words = sc.parallelize (   ["scala",    "java",    "hadoop",    "spark",    "akka",   "spark vs hadoop",    "pyspark",   "pyspark and spark"]) words.cache() caching = words.persist().is_cached print "Words got chached > %s" % (caching)--------------------------------------- -cache.py ---------------------------------------

命令 : 缓存()的命令是 :

  $ SPARK_HOME/bin/spark-submit cache.py

输出 : 上述程序的输出为 :

Words got cached -> True

这些是一些在PySpark RDD上完成的最重要的操作.

免责声明:以上内容(如有图片或视频亦包括在内)有转载其他网站资源,如有侵权请联系删除