在 pyspark 中的 RDD 上执行两个计数函数

Performing two count functions on an RDD in pyspark

给定一个包含元组的 RDD

(key1, 0)
(key2, 5)
(key3, 11)
(key1, 44)
(key2, 0)
(key3, 43)
(key1, 0)
(key2, 5)
(key3, 33)

对于每个 key,我希望计算 2 个值,每个键值的总计数,或 countByKey() 的常规输出,以及第二个计数,键的正数计数。

所以结果会是这样的:

[(key1, 3, 1),
(key2, 3, 2),
(key3, 3, 3)]

我只想使用 mapreduceByKey 函数:

def map(value):
    return (value[0], (value[1], 1, 1))

return键值对,value为数值的三元组,两个整数用于计数

def reduce(val1, val2):
    # if value[0] is positive, increment first counter
    # in any case, always increment second counter

data.map(map).reduceByKey(reduce).collect()

然后会是这样的:

[(key1, 3, 1),
(key2, 3, 2),
(key3, 3, 3)]

如果我理解正确的话是这样的:

def reduce(val1, val2):
    if val2[0] > 0: # val1 is the accumulator, val2 the new datum
        return (val1[0], val1[1] + 1, val1[2] + 1)
    else:
        return (val1[0], val1[1] + 1, val1[2])

data.map(map).reduceByKey(reduce).collect()

会起作用。

您可以在 map 步骤中检查值是否为正,如果 > 0 则赋值 1,否则 0。然后,像这样按键归约和求和:

rdd1 = data.map(lambda x: (x[0], (1, int(x[1] > 0)))) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

for k in rdd1.collect():
    print(k)

#('key1', (3, 1))
#('key2', (3, 2))
#('key3', (3, 3))