减少对 Spark 的操作
Reduce operation on Spark
我正在尝试减少具有 3 个值的 RDD,因此,首先,我将 rdd 映射为以下格式
a = mytable.rdd.map(lambda w: (w.id,(w.v1,w.v2,w.v3)))
然后在下一步中我使用以下代码减少它
b = a.reduceByKey(lambda a,b,c: (a[0] +','+ a[1],b[0] +','+ b[1],c[0] +','+ c[1]))
但是,我收到一个错误:
TypeError: () 正好接受 3 个参数(给定 2 个)
我的目标是添加该 rdd 的所有值,例如,如果我的 rdd 具有这些值:
[(id1, ('a','b','c')),(id1', ('e','f','g'))]
reduce 后我希望结果按以下顺序排列:
[(id1, ('a,d','b,e','c,f'))]
谢谢
最优解可以表示为:
a.groupByKey().mapValues(lambda vs: [",".join(v) for v in zip(*vs)])
其中 initial groupByKey
将数据分组为等价于的结构:
('id1', [('a','b','c'), ('e','f','g')])
zip(*vs)
将值转置为:
[('a', 'e'), ('b', 'f'), ('c', 'g')]
和 join
的理解连接每个元组。
reduceByKey
在这里确实不是正确的选择(考虑复杂性),但通常它需要两个参数的函数,所以 lambda a, b, c: ...
不会这样做。我相信你想要这样的东西:
lambda a, b: (a[0] + "," + b[0], a[1] + "," + b[1], a[2] + "," + b[2])
我正在尝试减少具有 3 个值的 RDD,因此,首先,我将 rdd 映射为以下格式
a = mytable.rdd.map(lambda w: (w.id,(w.v1,w.v2,w.v3)))
然后在下一步中我使用以下代码减少它
b = a.reduceByKey(lambda a,b,c: (a[0] +','+ a[1],b[0] +','+ b[1],c[0] +','+ c[1]))
但是,我收到一个错误: TypeError: () 正好接受 3 个参数(给定 2 个)
我的目标是添加该 rdd 的所有值,例如,如果我的 rdd 具有这些值:
[(id1, ('a','b','c')),(id1', ('e','f','g'))]
reduce 后我希望结果按以下顺序排列:
[(id1, ('a,d','b,e','c,f'))]
谢谢
最优解可以表示为:
a.groupByKey().mapValues(lambda vs: [",".join(v) for v in zip(*vs)])
其中 initial groupByKey
将数据分组为等价于的结构:
('id1', [('a','b','c'), ('e','f','g')])
zip(*vs)
将值转置为:
[('a', 'e'), ('b', 'f'), ('c', 'g')]
和 join
的理解连接每个元组。
reduceByKey
在这里确实不是正确的选择(考虑复杂性),但通常它需要两个参数的函数,所以 lambda a, b, c: ...
不会这样做。我相信你想要这样的东西:
lambda a, b: (a[0] + "," + b[0], a[1] + "," + b[1], a[2] + "," + b[2])