嵌套元组上的 Pyspark reduceByKey
Pyspark reduceByKey on nested tuples
我的问题与 类似,但有一些关键的区别。我是 PySpark 的新手,所以我肯定遗漏了一些明显的东西。
我有一个具有以下结构的 RDD:
(K0, ((k01,v01), (k02,v02), ...))
....
(Kn, ((kn1,vn1), (kn2,vn2), ...))
我想要的输出类似于
(K0, v01+v02+...)
...
(Kn, vn1+vn2+...)
这似乎是使用 reduceByKey
的完美案例,起初我想到了
rdd.reduceByKey(lambda x,y: x[1]+y[1])
这正是我开始使用的 RDD。我想我的索引有问题,因为有嵌套的元组,但我已经尝试了我能想到的所有可能的索引组合,它一直在给我返回初始的 RDD。
为什么它不能与嵌套元组一起使用,或者我做错了什么?
你根本不应该在这里使用 reduceByKey
。它采用带有签名的关联和交换功能。 (T, T) => T
。很明显,当您将 List[Tuple[U, T]]
作为输入并且期望 T
作为输出时,它不适用。
由于不清楚键是唯一的还是唯一的,让我们考虑一下当我们必须在本地和全局进行聚合时的一般示例。让我们假设 v01
, v02
, ... vm
是简单的数字:
from functools import reduce
from operator import add
def agg_(xs):
# For numeric values sum would be more idiomatic
# but lets make it more generic
return reduce(add, (x[1] for x in xs), zero_value)
zero_value = 0
merge_op = add
def seq_op(acc, xs):
return acc + agg_(xs)
rdd = sc.parallelize([
("K0", (("k01", 3), ("k02", 2))),
("K0", (("k03", 5), ("k04", 6))),
("K1", (("k11", 0), ("k12", -1)))])
rdd.aggregateByKey(0, seq_op, merge_op).take(2)
## [('K0', 16), ('K1', -1)]
如果键已经是唯一的,简单的 mapValues
就足够了:
from itertools import chain
unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x)))
unique_keys.mapValues(agg_).take(2)
## [('K0', 16), ('K1', -1)]
我的问题与
我有一个具有以下结构的 RDD:
(K0, ((k01,v01), (k02,v02), ...))
....
(Kn, ((kn1,vn1), (kn2,vn2), ...))
我想要的输出类似于
(K0, v01+v02+...)
...
(Kn, vn1+vn2+...)
这似乎是使用 reduceByKey
的完美案例,起初我想到了
rdd.reduceByKey(lambda x,y: x[1]+y[1])
这正是我开始使用的 RDD。我想我的索引有问题,因为有嵌套的元组,但我已经尝试了我能想到的所有可能的索引组合,它一直在给我返回初始的 RDD。
为什么它不能与嵌套元组一起使用,或者我做错了什么?
你根本不应该在这里使用 reduceByKey
。它采用带有签名的关联和交换功能。 (T, T) => T
。很明显,当您将 List[Tuple[U, T]]
作为输入并且期望 T
作为输出时,它不适用。
由于不清楚键是唯一的还是唯一的,让我们考虑一下当我们必须在本地和全局进行聚合时的一般示例。让我们假设 v01
, v02
, ... vm
是简单的数字:
from functools import reduce
from operator import add
def agg_(xs):
# For numeric values sum would be more idiomatic
# but lets make it more generic
return reduce(add, (x[1] for x in xs), zero_value)
zero_value = 0
merge_op = add
def seq_op(acc, xs):
return acc + agg_(xs)
rdd = sc.parallelize([
("K0", (("k01", 3), ("k02", 2))),
("K0", (("k03", 5), ("k04", 6))),
("K1", (("k11", 0), ("k12", -1)))])
rdd.aggregateByKey(0, seq_op, merge_op).take(2)
## [('K0', 16), ('K1', -1)]
如果键已经是唯一的,简单的 mapValues
就足够了:
from itertools import chain
unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x)))
unique_keys.mapValues(agg_).take(2)
## [('K0', 16), ('K1', -1)]