Pyspark 自定义累加器
Pyspark custom accumulators
我试图 运行 这个程序作为 pyspark 中自定义累加器的示例。我收到错误 'int is not iterable'。我无法解决这个问题。有人可以帮我解决这个问题吗?
import findspark
findspark.init()
from pyspark import AccumulatorParam, SparkContext
sc = SparkContext('local','local')
rdd = sc.parallelize(xrange(10))
class SAP(AccumulatorParam):
def zero(self, initialValue):
s=set()
s.add(initialValue)
return s
def addInPlace(self, v1, v2):
return v1.union(v2)
ids_seen = sc.accumulator(0, SAP())
def inc(x):
global ids_seen
ids_seen += x
return x
rdd.foreach(inc)
在类型方面 addInPlace
是 (R, R) => R
而 zero
是 (R) => R
。
初始值的类型应与您在累加器中期望的类型相同,因此您必须使用 set
:
初始化 Accumulator
ids_seen = sc.accumulator(set(), SAP())
或
ids_seen = sc.accumulator({0}, SAP())
和zero
应该是:
def zero(self, initialValue):
return initialValue.copy()
最后inc
应该加一个set
:
def inc(x):
global ids_seen
ids_seen += {x}
return x
我试图 运行 这个程序作为 pyspark 中自定义累加器的示例。我收到错误 'int is not iterable'。我无法解决这个问题。有人可以帮我解决这个问题吗?
import findspark
findspark.init()
from pyspark import AccumulatorParam, SparkContext
sc = SparkContext('local','local')
rdd = sc.parallelize(xrange(10))
class SAP(AccumulatorParam):
def zero(self, initialValue):
s=set()
s.add(initialValue)
return s
def addInPlace(self, v1, v2):
return v1.union(v2)
ids_seen = sc.accumulator(0, SAP())
def inc(x):
global ids_seen
ids_seen += x
return x
rdd.foreach(inc)
在类型方面 addInPlace
是 (R, R) => R
而 zero
是 (R) => R
。
初始值的类型应与您在累加器中期望的类型相同,因此您必须使用 set
:
Accumulator
ids_seen = sc.accumulator(set(), SAP())
或
ids_seen = sc.accumulator({0}, SAP())
和zero
应该是:
def zero(self, initialValue):
return initialValue.copy()
最后inc
应该加一个set
:
def inc(x):
global ids_seen
ids_seen += {x}
return x