PySpark 减少按键?添加 Key/Tuple

PySpark reduceByKey? to add Key/Tuple

我有以下数据,我要做的是

[(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]

是为每个键计数值的实例(1个字符串字符)。于是我先做了一张图:

.map(lambda x: (x[0], [x[1], 1]))

现在成为 key/tuple 个:

[(13, ['D', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['T', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['T', 1]), (53, ['2', 1]), (54, ['0', 1]), (13, ['A', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['A', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['X', 1])]

我只是无法在最后一部分弄清楚如何为每个键计算该字母的实例。例如键 13 将有 1 个 D 和 1 个 A。而 14 将有 2 个 T,等等

如果我没理解错的话,你可以一次操作完成combineByKey:

from collections import Counter
x = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
result = x.combineByKey(lambda value:  {value: 1}, 
...                     lambda x, value:  value.get(x,0) + 1,
...                     lambda x, y: dict(Counter(x) + Counter(y)))
result.collect()
[(32, {'6': 2}), (48, {'0': 2}), (49, {'2': 2}), (53, {'2': 1}), (13, {'A': 1, 'D': 1}), (45, {'A': 1, 'T': 1}), (50, {'0': 2}), (54, {'0': 1}), (14, {'T': 2}), (51, {'X': 1, 'T': 1}), (47, {'2': 2})]

我对 Scala 中的 Spark 更熟悉,因此可能有比 Counter 更好的方法来计算 groupByKey 生成的可迭代对象中的字符数,但这里有一个选项:

from collections import Counter

rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.groupByKey().mapValues(Counter).collect()

[(48, Counter({'0': 2})),
 (32, Counter({'6': 2})),
 (49, Counter({'2': 2})),
 (50, Counter({'0': 2})),
 (51, Counter({'X': 1, 'T': 1})),
 (53, Counter({'2': 1})),
 (13, Counter({'A': 1, 'D': 1})),
 (45, Counter({'A': 1, 'T': 1})),
 (14, Counter({'T': 2})),
 (54, Counter({'0': 1})),
 (47, Counter({'2': 2}))]

而不是:

.map(lambda x: (x[0], [x[1], 1]))

我们可以这样做:

.map(lambda x: ((x[0], x[1]), 1))

在最后一步,我们可以使用 reduceByKeyadd。请注意,add 来自 operator 包。

放在一起:

from operator import add
rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).collect()

我尝试使用函数和 mapValues() 转换

def f(Counter): return Counter

from collections import Counter

rdd=sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')])
rdd.groupByKey().mapValues(Counter).collect()