使用 map reduce 更新全局变量

Update global variables by using map reduce

假设我在 pyspark 中有这个:

def condi( x ):
    if x["age"] <= 2:
        return True
    else:
        return False

def add_count( x ):
    global aa
    aa += 1
    x["count"] += 10000
    return x

sc = pyspark.SparkContext(  master = 'spark://192.168.56.103:7077',appName = 'test' )

data = [{"age":1,"count":10},{"age":2,"count":20},{"age":3,"count":30}]

data = sc.parallelize( data )

global aa
aa = 0

k = data.map( lambda x : add_count( x ) if condi( x ) else x )

print( k.collect() )
print( aa )

输出如下:

[{'count': 10010, 'age': 1}, {'count': 10020, 'age': 2}, {'count': 30, 'age': 3}] # data
0 # aa

全局变量aa根本没有修改。

如何使用 map reduce 修改全局变量?

您需要将aa声明为Accumulator,以便所有执行者共享。请使用

aa = sc.accumulator(0)

而不是

aa = 0

此更改后,打印出的值将是2

说明:每个执行器都使用自己的本地变量副本。因此,将 +1 添加到执行程序上 aa 的一个 copy 不会更改驱动程序上 aa 的值。语句 print( aa ) 在驱动程序上执行,因此在执行程序上看不到更改。

您还可以查看