Pyspark 自定义累加器

Pyspark custom accumulators

我试图 运行 这个程序作为 pyspark 中自定义累加器的示例。我收到错误 'int is not iterable'。我无法解决这个问题。有人可以帮我解决这个问题吗?

import findspark
findspark.init()
from pyspark import AccumulatorParam, SparkContext
sc = SparkContext('local','local')

rdd = sc.parallelize(xrange(10))

class SAP(AccumulatorParam):
    def zero(self, initialValue):
        s=set()
        s.add(initialValue)
        return s
    def addInPlace(self, v1, v2):

        return v1.union(v2)



ids_seen = sc.accumulator(0, SAP())
def inc(x):
    global ids_seen
    ids_seen += x
    return x

rdd.foreach(inc)

在类型方面 addInPlace(R, R) => Rzero(R) => R

初始值的类型应与您在累加器中期望的类型相同,因此您必须使用 set:

初始化 Accumulator
ids_seen = sc.accumulator(set(), SAP())

ids_seen = sc.accumulator({0}, SAP())

zero应该是:

def zero(self, initialValue):
    return initialValue.copy()

最后inc应该加一个set:

def inc(x):
    global ids_seen
    ids_seen += {x}
    return x