计算最大值的 PySpark 累加器
PySpark accumulator that computes maximum value
如果我们需要将累加器的值设置为所有 task/nodes 返回的所有值中的最大值怎么办?
示例:
累加器a
- 节点 1 sets:5
- 节点 2 sets:6
- 节点 3 sets:4
因为6大于4,所以累加器的最终值应该是6。
您必须像这样定义一个 AccumulatorParam
:
from pyspark import AccumulatorParam
class MaxAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return initialValue
def addInPlace(self, v1, v2):
return max(v1, v2)
可以如下图使用:
acc = spark.sparkContext.accumulator(float("-inf"), MaxAccumulatorParam())
rdd = sc.parallelize([5, 6, 4], 3)
acc.value
# -inf
rdd.foreach(lambda x: acc.add(x))
acc.value
# 6
如果我们需要将累加器的值设置为所有 task/nodes 返回的所有值中的最大值怎么办?
示例:
累加器a
- 节点 1 sets:5
- 节点 2 sets:6
- 节点 3 sets:4
因为6大于4,所以累加器的最终值应该是6。
您必须像这样定义一个 AccumulatorParam
:
from pyspark import AccumulatorParam
class MaxAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return initialValue
def addInPlace(self, v1, v2):
return max(v1, v2)
可以如下图使用:
acc = spark.sparkContext.accumulator(float("-inf"), MaxAccumulatorParam())
rdd = sc.parallelize([5, 6, 4], 3)
acc.value
# -inf
rdd.foreach(lambda x: acc.add(x))
acc.value
# 6