在 pyspark 中制作字典和 RDD 列表时出现意外结果

Unexpected results when making dicts and lists of RDDs in pyspark

下面是一个简单的 pyspark 脚本,它试图将一个 RDD 拆分成一个包含多个 RDD 的字典。

示例 运行 所示,只有当我们在创建中间 RDD 时对它们执行 collect() 时,该脚本才有效。当然我不想在实践中这样做,因为它无法扩展。

真正奇怪的是,我没有将中间 collect() 结果分配给任何变量。因此,行为上的差异完全是由于 collect() 调用触发的计算的隐藏副作用造成的。

Spark 应该是一个非常实用的框架,副作用最小。为什么只能通过使用 collect() 触发一些神秘的副作用来获得所需的行为?

下面的 运行 是 Spark 1.5.2、Python 2.7.10 和 IPython 4.0.0。

spark_script.py

from pprint import PrettyPrinter
pp = PrettyPrinter(indent=4).pprint
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
    d = dict()
    for key_value in key_values:
        d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
        if collect_in_loop:
            d[key_value].collect()
    return d
def print_results(d):
    for k in d:
        print k
        pp(d[k].collect())    

rdd = sc.parallelize([
    {'color':'red','size':3},
    {'color':'red', 'size':7},
    {'color':'red', 'size':8},    
    {'color':'red', 'size':10},
    {'color':'green', 'size':9},
    {'color':'green', 'size':5},
    {'color':'green', 'size':50},    
    {'color':'blue', 'size':4},
    {'color':'purple', 'size':6}])
key_field = 'color'
key_values = ['red', 'green', 'blue', 'purple']

print '### run WITH collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
print_results(d)
print '### run WITHOUT collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
print_results(d)

IPython shell

中的示例 运行
In [1]: execfile('spark_script.py')
### run WITH collect in loop: 
blue
[{   'color': 'blue', 'size': 4}]
purple
[{   'color': 'purple', 'size': 6}]
green
[   {   'color': 'green', 'size': 9},
    {   'color': 'green', 'size': 5},
    {   'color': 'green', 'size': 50}]
red
[   {   'color': 'red', 'size': 3},
    {   'color': 'red', 'size': 7},
    {   'color': 'red', 'size': 8},
    {   'color': 'red', 'size': 10}]
### run WITHOUT collect in loop: 
blue
[{   'color': 'purple', 'size': 6}]
purple
[{   'color': 'purple', 'size': 6}]
green
[{   'color': 'purple', 'size': 6}]
red
[{   'color': 'purple', 'size': 6}]

简答

事实证明,这与其说是一个 Spark 问题,不如说是一个 tricky Python feature 称为 late-binding 闭包的问题。 强制早期绑定的快速破解(在这种情况下所需的行为)是添加一个默认参数:

lambda row, key_value=key_value: row[key_field] == key_value

另一种方式是 functools.partial。

长答案

当函数在 Python 中定义时,来自函数外部的任何参数都会从定义环境(词法范围)中检索,这样就完成了在计算函数时,而不是在定义函数时(后期绑定)。因此,在过滤器转换使用的 lambda 函数中,key_value 的值直到对函数求值后才确定。

您可以从这里开始看到危险:key_valuesplit_RDDs_by_key() 的循环中采用多个值。如果在评估 lambda 时,key_value 不再具有我们想要的值怎么办?函数通常在定义很久之后才被求值,尤其是在使用 RDD 时。由于 RDD 的惰性计算语义,lambda 直到调用 action 来检索数据时才会被评估,例如 collect()take().

split_RDD_by_key()中,我们遍历key_values并为每个值创建一个新的RDD。当collect_in_loop=False时,没有collect()直到split_RDD_by_key()执行完毕。到那时,里面的循环就完成了,并且 key_value 现在具有来自循环的最后一次迭代的值 'purple'。当对来自 split_RDD_by_key() 的所有 RDD 中的所有 lambda 进行评估时,它们都将 key_value 设置为 'purple' 并检索 RDD 的 'purple' 行。

collect_in_loop=True 时,我们在每次迭代中执行 collect(),导致 lambda 在定义它的同一迭代中被评估,我们得到我们期望的 key_value .

这个例子实际上揭示了关于 python 闭包的一个有趣的、微妙的细节。当 in-loop collect() 触发 lambda 的计算时,lambda 绑定一个值。但是,当 key_value 与第一次 lambda 评估时的状态发生变化(在定义环境中)时,lambda 在使用后面的 collect() 语句进行评估时会做什么?此示例显示 函数闭包的所有计算都基于第一个计算的绑定。 "Calling means closure, once and for all."