PySpark 的 reduceByKey 没有按预期工作
PySpark's reduceByKey not working as expected
我正在编写一个大型 PySpark 程序,最近 运行 在 RDD 上使用 reduceByKey
时遇到了麻烦。我已经能够通过一个简单的测试程序重现该问题。代码是:
from pyspark import SparkConf, SparkContext
APP_NAME = 'Test App'
def main(sc):
test = [(0, [i]) for i in xrange(100)]
test = sc.parallelize(test)
test = test.reduceByKey(method)
print test.collect()
def method(x, y):
x.append(y[0])
return x
if __name__ == '__main__':
# Configure Spark
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster('local[*]')
sc = SparkContext(conf=conf)
main(sc)
根据 Spark 文档,我希望输出为 (0, [0,1,2,3,4,...,98,99])
。相反,我得到以下输出:
[(0, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 24, 36, 48, 60, 72, 84])]
有人可以帮我理解为什么会生成这个输出吗?
作为旁注,当我使用
def method(x, y):
x = x + y
return x
我得到了预期的输出。
首先看起来你真的想要 groupByKey
而不是 reduceByKey
:
rdd = sc.parallelize([(0, i) for i in xrange(100)])
grouped = rdd.groupByKey()
k, vs = grouped.first()
assert len(list(vs)) == 100
Could someone please help me understand why this output is being generated?
reduceByKey
assumes that f
is associative 而你的 method
显然不是。根据操作的顺序,输出是不同的。假设您从某个键的以下数据开始:
[1], [2], [3], [4]
现在让我们添加一些括号:
((([1], [2]), [3]), [4])
(([1, 2], [3]), [4])
([1, 2, 3], [4])
[1, 2, 3, 4]
和另一组括号
(([1], ([2], [3])), [4])
(([1], [2, 3]), [4])
([1, 2], [4])
[1, 2, 4]
当你重写如下:
method = lambda x, y: x + y
或干脆
from operator import add
method = add
你得到了一个关联函数,它按预期工作。
一般来说,对于 reduce*
操作,您需要既关联又 commutative 的函数。
我正在编写一个大型 PySpark 程序,最近 运行 在 RDD 上使用 reduceByKey
时遇到了麻烦。我已经能够通过一个简单的测试程序重现该问题。代码是:
from pyspark import SparkConf, SparkContext
APP_NAME = 'Test App'
def main(sc):
test = [(0, [i]) for i in xrange(100)]
test = sc.parallelize(test)
test = test.reduceByKey(method)
print test.collect()
def method(x, y):
x.append(y[0])
return x
if __name__ == '__main__':
# Configure Spark
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster('local[*]')
sc = SparkContext(conf=conf)
main(sc)
根据 Spark 文档,我希望输出为 (0, [0,1,2,3,4,...,98,99])
。相反,我得到以下输出:
[(0, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 24, 36, 48, 60, 72, 84])]
有人可以帮我理解为什么会生成这个输出吗?
作为旁注,当我使用
def method(x, y):
x = x + y
return x
我得到了预期的输出。
首先看起来你真的想要 groupByKey
而不是 reduceByKey
:
rdd = sc.parallelize([(0, i) for i in xrange(100)])
grouped = rdd.groupByKey()
k, vs = grouped.first()
assert len(list(vs)) == 100
Could someone please help me understand why this output is being generated?
reduceByKey
assumes that f
is associative 而你的 method
显然不是。根据操作的顺序,输出是不同的。假设您从某个键的以下数据开始:
[1], [2], [3], [4]
现在让我们添加一些括号:
((([1], [2]), [3]), [4])
(([1, 2], [3]), [4])
([1, 2, 3], [4])
[1, 2, 3, 4]
和另一组括号
(([1], ([2], [3])), [4])
(([1], [2, 3]), [4])
([1, 2], [4])
[1, 2, 4]
当你重写如下:
method = lambda x, y: x + y
或干脆
from operator import add
method = add
你得到了一个关联函数,它按预期工作。
一般来说,对于 reduce*
操作,您需要既关联又 commutative 的函数。