pyspark 折叠方法输出
pyspark fold method output
我对 fold
的这个输出感到惊讶,我无法想象它在做什么。
我预计 something.fold(0, lambda a,b: a+1)
会 return something
中的元素数量,因为折叠从 0
开始并为每个添加 1
元素。
sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
8
我来自 Scala,其中 fold 的工作方式与我描述的一样。那么 fold 应该如何在 pyspark 中工作?谢谢你的想法。
要了解这里发生了什么,让我们看一下 Spark 的 fold
操作的定义。由于您使用的是 PySpark,我将展示代码的 Python 版本,但 Scala 版本表现出完全相同的行为(您也可以 browse the source on GitHub):
def fold(self, zeroValue, op):
"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative function and a neutral "zero
value."
The function C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
modify C{t2}.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
"""
def func(iterator):
acc = zeroValue
for obj in iterator:
acc = op(obj, acc)
yield acc
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)
(比较见Scala implementation of RDD.fold
)。
Spark 的fold
通过先折叠每个分区然后折叠结果来操作。问题是空分区被向下折叠到零元素,因此最终的驱动程序端折叠最终为 每个 分区折叠一个值,而不是每个 非空分区。这意味着 fold
的结果对分区数敏感:
>>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
100
>>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
50
>>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
1
在最后一种情况下,发生的事情是单个分区被折叠成正确的值,然后该值在驱动程序中与零值折叠以产生 1。
看来Spark的fold()
操作其实还要求fold函数除了结合性之外还要有可交换性。实际上,在 Spark 中还有其他地方强加了这一要求,例如一个混洗分区中元素的排序在运行中可能是不确定的(参见 SPARK-5750)。
我已打开 Spark JIRA 票证来调查此问题:https://issues.apache.org/jira/browse/SPARK-6416。
让我试着举个简单的例子来解释一下spark的fold方法。我将在这里使用 pyspark。
rdd1 = sc.parallelize(list([]),1)
上一行将创建一个只有一个分区的空 rdd
rdd1.fold(10, lambda x,y:x+y)
This yield output as 20
rdd2 = sc.parallelize(list([1,2,3,4,5]),2)
上一行将创建值为 1 到 5 的 rdd,并且总共有 2 个分区
rdd2.fold(10, lambda x,y:x+y)
This yields output as 45
因此,在上述情况下,为了简单起见,这里发生的是第零个元素为 10。因此,您将以其他方式获得 RDD 中所有数字的总和现在加上 10(即第零个元素+所有其他元素 => 10+1+2+3+4+5 = 25)。现在我们还有两个分区(即分区数*第零个元素=> 2*10 = 20)
fold 发出的最终输出是 25+20 = 45
使用类似的过程很清楚为什么对 rdd1 的折叠操作产生 20 作为输出。
当我们有类似 rdd1.reduce(lambda x,y:x+y)
的空列表时,Reduce 会失败
ValueError: Can not reduce() empty RDD
如果我们认为我们可以在 rdd 中有空列表,则可以使用折叠
rdd1.fold(0, lambda x,y:x+y)
As expected this will yield output as 0.
我对 fold
的这个输出感到惊讶,我无法想象它在做什么。
我预计 something.fold(0, lambda a,b: a+1)
会 return something
中的元素数量,因为折叠从 0
开始并为每个添加 1
元素。
sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
8
我来自 Scala,其中 fold 的工作方式与我描述的一样。那么 fold 应该如何在 pyspark 中工作?谢谢你的想法。
要了解这里发生了什么,让我们看一下 Spark 的 fold
操作的定义。由于您使用的是 PySpark,我将展示代码的 Python 版本,但 Scala 版本表现出完全相同的行为(您也可以 browse the source on GitHub):
def fold(self, zeroValue, op):
"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative function and a neutral "zero
value."
The function C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
modify C{t2}.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
"""
def func(iterator):
acc = zeroValue
for obj in iterator:
acc = op(obj, acc)
yield acc
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)
(比较见Scala implementation of RDD.fold
)。
Spark 的fold
通过先折叠每个分区然后折叠结果来操作。问题是空分区被向下折叠到零元素,因此最终的驱动程序端折叠最终为 每个 分区折叠一个值,而不是每个 非空分区。这意味着 fold
的结果对分区数敏感:
>>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
100
>>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
50
>>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
1
在最后一种情况下,发生的事情是单个分区被折叠成正确的值,然后该值在驱动程序中与零值折叠以产生 1。
看来Spark的fold()
操作其实还要求fold函数除了结合性之外还要有可交换性。实际上,在 Spark 中还有其他地方强加了这一要求,例如一个混洗分区中元素的排序在运行中可能是不确定的(参见 SPARK-5750)。
我已打开 Spark JIRA 票证来调查此问题:https://issues.apache.org/jira/browse/SPARK-6416。
让我试着举个简单的例子来解释一下spark的fold方法。我将在这里使用 pyspark。
rdd1 = sc.parallelize(list([]),1)
上一行将创建一个只有一个分区的空 rdd
rdd1.fold(10, lambda x,y:x+y)
This yield output as 20
rdd2 = sc.parallelize(list([1,2,3,4,5]),2)
上一行将创建值为 1 到 5 的 rdd,并且总共有 2 个分区
rdd2.fold(10, lambda x,y:x+y)
This yields output as 45
因此,在上述情况下,为了简单起见,这里发生的是第零个元素为 10。因此,您将以其他方式获得 RDD 中所有数字的总和现在加上 10(即第零个元素+所有其他元素 => 10+1+2+3+4+5 = 25)。现在我们还有两个分区(即分区数*第零个元素=> 2*10 = 20) fold 发出的最终输出是 25+20 = 45
使用类似的过程很清楚为什么对 rdd1 的折叠操作产生 20 作为输出。
当我们有类似 rdd1.reduce(lambda x,y:x+y)
ValueError: Can not reduce() empty RDD
如果我们认为我们可以在 rdd 中有空列表,则可以使用折叠
rdd1.fold(0, lambda x,y:x+y)
As expected this will yield output as 0.