pyspark 中的累加器,字典作为全局变量

accumulator in pyspark with dict as global variable

出于学习目的,我尝试将字典设置为累加器中的全局变量,add 函数运行良好,但我 运行 代码并将字典放入 map 函数中,它总是 return空。

但是将列表设置为全局变量的类似代码

class DictParam(AccumulatorParam):
    def zero(self,  value = ""):
        return dict()

    def addInPlace(self, acc1, acc2):
        acc1.update(acc2)


if  __name__== "__main__":
    sc, sqlContext = init_spark("generate_score_summary", 40)
    rdd = sc.textFile('input')
    #print(rdd.take(5))



    dict1 = sc.accumulator({}, DictParam())


    def file_read(line):
        global dict1
        ls = re.split(',', line)
        dict1+={ls[0]:ls[1]}
        return line


    rdd = rdd.map(lambda x: file_read(x)).cache()
    print(dict1)

For accumulator updates performed inside actions only, their value is only updated once that RDD is computed as part of an action

我相信 print(dict1()) 会在 rdd.map() 之前执行。

在Spark中,有两种operations:

  • 变换,即描述未来的计算
  • 和行动,呼吁采取行动,并实际触发执行

累加器仅在 some action is executed:

时更新

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action.

如果您查看文档这一部分的末尾,有一个与您的示例完全相同的示例:

accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.

因此您需要添加一些操作,例如:

rdd = rdd.map(lambda x: file_read(x)).cache() # transformation
foo = rdd.count() # action
print(dict1)

请务必检查各种 RDD 函数和累加器特性的详细信息,因为这可能会影响结果的正确性。 (例如,rdd.take(n) 默认情况下 only scan one partition,而不是整个数据集。)

对于到达此线程并为 pyspark 寻找 Dict 累加​​器的任何人:已接受的解决方案无法解决提出的问题。

这个问题实际上是在DictParam定义的,它没有更新原来的字典。这有效:

class DictParam(AccumulatorParam):
    def zero(self,  value = ""):
        return dict()

    def addInPlace(self, value1, value2):
        value1.update(value2)
        return value1

原始代码缺少 return 值。