在 pyspark 中制作字典和 RDD 列表时出现意外结果
Unexpected results when making dicts and lists of RDDs in pyspark
下面是一个简单的 pyspark
脚本,它试图将一个 RDD 拆分成一个包含多个 RDD 的字典。
如 示例 运行 所示,只有当我们在创建中间 RDD 时对它们执行 collect()
时,该脚本才有效。当然我不想在实践中这样做,因为它无法扩展。
真正奇怪的是,我没有将中间 collect()
结果分配给任何变量。因此,行为上的差异完全是由于 collect()
调用触发的计算的隐藏副作用造成的。
Spark 应该是一个非常实用的框架,副作用最小。为什么只能通过使用 collect()
触发一些神秘的副作用来获得所需的行为?
下面的 运行 是 Spark 1.5.2、Python 2.7.10 和 IPython 4.0.0。
spark_script.py
from pprint import PrettyPrinter
pp = PrettyPrinter(indent=4).pprint
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
d = dict()
for key_value in key_values:
d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
if collect_in_loop:
d[key_value].collect()
return d
def print_results(d):
for k in d:
print k
pp(d[k].collect())
rdd = sc.parallelize([
{'color':'red','size':3},
{'color':'red', 'size':7},
{'color':'red', 'size':8},
{'color':'red', 'size':10},
{'color':'green', 'size':9},
{'color':'green', 'size':5},
{'color':'green', 'size':50},
{'color':'blue', 'size':4},
{'color':'purple', 'size':6}])
key_field = 'color'
key_values = ['red', 'green', 'blue', 'purple']
print '### run WITH collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
print_results(d)
print '### run WITHOUT collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
print_results(d)
IPython shell
中的示例 运行
In [1]: execfile('spark_script.py')
### run WITH collect in loop:
blue
[{ 'color': 'blue', 'size': 4}]
purple
[{ 'color': 'purple', 'size': 6}]
green
[ { 'color': 'green', 'size': 9},
{ 'color': 'green', 'size': 5},
{ 'color': 'green', 'size': 50}]
red
[ { 'color': 'red', 'size': 3},
{ 'color': 'red', 'size': 7},
{ 'color': 'red', 'size': 8},
{ 'color': 'red', 'size': 10}]
### run WITHOUT collect in loop:
blue
[{ 'color': 'purple', 'size': 6}]
purple
[{ 'color': 'purple', 'size': 6}]
green
[{ 'color': 'purple', 'size': 6}]
red
[{ 'color': 'purple', 'size': 6}]
简答
事实证明,这与其说是一个 Spark 问题,不如说是一个 tricky Python feature 称为 late-binding 闭包的问题。 强制早期绑定的快速破解(在这种情况下所需的行为)是添加一个默认参数:
lambda row, key_value=key_value: row[key_field] == key_value
另一种方式是 functools.partial。
长答案
当函数在 Python 中定义时,来自函数外部的任何参数都会从定义环境(词法范围)中检索,这样就完成了在计算函数时,而不是在定义函数时(后期绑定)。因此,在过滤器转换使用的 lambda 函数中,key_value
的值直到对函数求值后才确定。
您可以从这里开始看到危险:key_value
在 split_RDDs_by_key()
的循环中采用多个值。如果在评估 lambda
时,key_value
不再具有我们想要的值怎么办?函数通常在定义很久之后才被求值,尤其是在使用 RDD 时。由于 RDD 的惰性计算语义,lambda 直到调用 action 来检索数据时才会被评估,例如 collect()
或 take()
.
在split_RDD_by_key()
中,我们遍历key_values
并为每个值创建一个新的RDD。当collect_in_loop=False
时,没有collect()
直到split_RDD_by_key()
执行完毕。到那时,里面的循环就完成了,并且 key_value
现在具有来自循环的最后一次迭代的值 'purple'。当对来自 split_RDD_by_key()
的所有 RDD 中的所有 lambda 进行评估时,它们都将 key_value
设置为 'purple' 并检索 RDD 的 'purple' 行。
当 collect_in_loop=True
时,我们在每次迭代中执行 collect()
,导致 lambda 在定义它的同一迭代中被评估,我们得到我们期望的 key_value
.
这个例子实际上揭示了关于 python 闭包的一个有趣的、微妙的细节。当 in-loop collect()
触发 lambda 的计算时,lambda 绑定一个值。但是,当 key_value
与第一次 lambda 评估时的状态发生变化(在定义环境中)时,lambda 在使用后面的 collect()
语句进行评估时会做什么?此示例显示 函数闭包的所有计算都基于第一个计算的绑定。 "Calling means closure, once and for all."
下面是一个简单的 pyspark
脚本,它试图将一个 RDD 拆分成一个包含多个 RDD 的字典。
如 示例 运行 所示,只有当我们在创建中间 RDD 时对它们执行 collect()
时,该脚本才有效。当然我不想在实践中这样做,因为它无法扩展。
真正奇怪的是,我没有将中间 collect()
结果分配给任何变量。因此,行为上的差异完全是由于 collect()
调用触发的计算的隐藏副作用造成的。
Spark 应该是一个非常实用的框架,副作用最小。为什么只能通过使用 collect()
触发一些神秘的副作用来获得所需的行为?
下面的 运行 是 Spark 1.5.2、Python 2.7.10 和 IPython 4.0.0。
spark_script.py
from pprint import PrettyPrinter
pp = PrettyPrinter(indent=4).pprint
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
d = dict()
for key_value in key_values:
d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
if collect_in_loop:
d[key_value].collect()
return d
def print_results(d):
for k in d:
print k
pp(d[k].collect())
rdd = sc.parallelize([
{'color':'red','size':3},
{'color':'red', 'size':7},
{'color':'red', 'size':8},
{'color':'red', 'size':10},
{'color':'green', 'size':9},
{'color':'green', 'size':5},
{'color':'green', 'size':50},
{'color':'blue', 'size':4},
{'color':'purple', 'size':6}])
key_field = 'color'
key_values = ['red', 'green', 'blue', 'purple']
print '### run WITH collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
print_results(d)
print '### run WITHOUT collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
print_results(d)
IPython shell
中的示例 运行In [1]: execfile('spark_script.py')
### run WITH collect in loop:
blue
[{ 'color': 'blue', 'size': 4}]
purple
[{ 'color': 'purple', 'size': 6}]
green
[ { 'color': 'green', 'size': 9},
{ 'color': 'green', 'size': 5},
{ 'color': 'green', 'size': 50}]
red
[ { 'color': 'red', 'size': 3},
{ 'color': 'red', 'size': 7},
{ 'color': 'red', 'size': 8},
{ 'color': 'red', 'size': 10}]
### run WITHOUT collect in loop:
blue
[{ 'color': 'purple', 'size': 6}]
purple
[{ 'color': 'purple', 'size': 6}]
green
[{ 'color': 'purple', 'size': 6}]
red
[{ 'color': 'purple', 'size': 6}]
简答
事实证明,这与其说是一个 Spark 问题,不如说是一个 tricky Python feature 称为 late-binding 闭包的问题。 强制早期绑定的快速破解(在这种情况下所需的行为)是添加一个默认参数:
lambda row, key_value=key_value: row[key_field] == key_value
另一种方式是 functools.partial。
长答案
当函数在 Python 中定义时,来自函数外部的任何参数都会从定义环境(词法范围)中检索,这样就完成了在计算函数时,而不是在定义函数时(后期绑定)。因此,在过滤器转换使用的 lambda 函数中,key_value
的值直到对函数求值后才确定。
您可以从这里开始看到危险:key_value
在 split_RDDs_by_key()
的循环中采用多个值。如果在评估 lambda
时,key_value
不再具有我们想要的值怎么办?函数通常在定义很久之后才被求值,尤其是在使用 RDD 时。由于 RDD 的惰性计算语义,lambda 直到调用 action 来检索数据时才会被评估,例如 collect()
或 take()
.
在split_RDD_by_key()
中,我们遍历key_values
并为每个值创建一个新的RDD。当collect_in_loop=False
时,没有collect()
直到split_RDD_by_key()
执行完毕。到那时,里面的循环就完成了,并且 key_value
现在具有来自循环的最后一次迭代的值 'purple'。当对来自 split_RDD_by_key()
的所有 RDD 中的所有 lambda 进行评估时,它们都将 key_value
设置为 'purple' 并检索 RDD 的 'purple' 行。
当 collect_in_loop=True
时,我们在每次迭代中执行 collect()
,导致 lambda 在定义它的同一迭代中被评估,我们得到我们期望的 key_value
.
这个例子实际上揭示了关于 python 闭包的一个有趣的、微妙的细节。当 in-loop collect()
触发 lambda 的计算时,lambda 绑定一个值。但是,当 key_value
与第一次 lambda 评估时的状态发生变化(在定义环境中)时,lambda 在使用后面的 collect()
语句进行评估时会做什么?此示例显示 函数闭包的所有计算都基于第一个计算的绑定。 "Calling means closure, once and for all."