如何在spark中进行分组和加法?

How to group and add up in spark?

我有这样一个 RDD:

{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }

{"key1" : "fruit" , "key2" : "US" , "key3" : "2" }

{"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }

{"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }

{"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }

我的目标是 首先按 key1 分组,然后按 key2 分组 最后添加 key3.

我期待最终结果,

key1          key2      key3
"fruit"     , "US"    , 3
"vegetable" , "US"    , 1
"fruit"     , "Japan" , 3
"vegetable" , "Japan" , 3

我的代码开始如下,

rdd_arm = rdd_arm.map(lambda x: x[1])

rdd_arm 包括上面的键:值格式。

我不确定下一步该去哪里。 有人能帮帮我吗?

我自己解决了

我必须创建一个包含多个密钥的密钥,然后相加。

rdd_arm.map( lambda x : x[0] + ", " + x[1] , x[2] ).reduceByKey( lambda a,b : a + b )

以下问题很有用。

How to group by multiple keys in spark?

让我们创建您的 RDD:

In [1]: rdd_arm = sc.parallelize([{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "US" , "key3" : "2" }, {"key1" : "vegetable" , "key2" : "US" ,  "key3" : "1" }, {"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }, {"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }])
In [2]: rdd_arm.collect()
Out[2]: 
[{'key1': 'fruit', 'key2': 'US', 'key3': '1'},
 {'key1': 'fruit', 'key2': 'US', 'key3': '2'},
 {'key1': 'vegetable', 'key2': 'US', 'key3': '1'},
 {'key1': 'fruit', 'key2': 'Japan', 'key3': '3'},
 {'key1': 'vegetable', 'key2': 'Japan', 'key3': '3'}]

首先,您必须创建一个新密钥,它将是 key1key2 对。它的值将是 key3,所以你想做这样的事情:

In [3]: new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3']))

In [4]: new_rdd.collect()
Out[4]: 
[('fruit, US', '1'),
 ('fruit, US', '2'),
 ('vegetable, US', '1'),
 ('fruit, Japan', '3'),
 ('vegetable, Japan', '3')]

然后,我们要添加重复键的值,只需调用 reduceByKey(),如下所示:

In [5]: new_rdd = new_rdd.reduceByKey(lambda a, b: int(a) + int(b))

In [6]: new_rdd.collect()
Out[6]: 
[('fruit, US', 3),
 ('fruit, Japan', '3'),
 ('vegetable, US', '1'),
 ('vegetable, Japan', '3')]

大功告成!


当然也可以是单行的,像这样:

new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])).reduceByKey(lambda a, b: int(a) + int(b))