如何在spark中进行分组和加法?
How to group and add up in spark?
我有这样一个 RDD:
{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }
{"key1" : "fruit" , "key2" : "US" , "key3" : "2" }
{"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }
{"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }
{"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }
我的目标是
首先按 key1 分组,然后按 key2 分组
最后添加 key3.
我期待最终结果,
key1 key2 key3
"fruit" , "US" , 3
"vegetable" , "US" , 1
"fruit" , "Japan" , 3
"vegetable" , "Japan" , 3
我的代码开始如下,
rdd_arm = rdd_arm.map(lambda x: x[1])
rdd_arm 包括上面的键:值格式。
我不确定下一步该去哪里。
有人能帮帮我吗?
我自己解决了
我必须创建一个包含多个密钥的密钥,然后相加。
rdd_arm.map( lambda x : x[0] + ", " + x[1] , x[2] ).reduceByKey( lambda a,b : a + b )
以下问题很有用。
How to group by multiple keys in spark?
让我们创建您的 RDD:
In [1]: rdd_arm = sc.parallelize([{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "US" , "key3" : "2" }, {"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }, {"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }])
In [2]: rdd_arm.collect()
Out[2]:
[{'key1': 'fruit', 'key2': 'US', 'key3': '1'},
{'key1': 'fruit', 'key2': 'US', 'key3': '2'},
{'key1': 'vegetable', 'key2': 'US', 'key3': '1'},
{'key1': 'fruit', 'key2': 'Japan', 'key3': '3'},
{'key1': 'vegetable', 'key2': 'Japan', 'key3': '3'}]
首先,您必须创建一个新密钥,它将是 key1
和 key2
对。它的值将是 key3
,所以你想做这样的事情:
In [3]: new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3']))
In [4]: new_rdd.collect()
Out[4]:
[('fruit, US', '1'),
('fruit, US', '2'),
('vegetable, US', '1'),
('fruit, Japan', '3'),
('vegetable, Japan', '3')]
然后,我们要添加重复键的值,只需调用 reduceByKey(),如下所示:
In [5]: new_rdd = new_rdd.reduceByKey(lambda a, b: int(a) + int(b))
In [6]: new_rdd.collect()
Out[6]:
[('fruit, US', 3),
('fruit, Japan', '3'),
('vegetable, US', '1'),
('vegetable, Japan', '3')]
大功告成!
当然也可以是单行的,像这样:
new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])).reduceByKey(lambda a, b: int(a) + int(b))
我有这样一个 RDD:
{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }
{"key1" : "fruit" , "key2" : "US" , "key3" : "2" }
{"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }
{"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }
{"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }
我的目标是 首先按 key1 分组,然后按 key2 分组 最后添加 key3.
我期待最终结果,
key1 key2 key3
"fruit" , "US" , 3
"vegetable" , "US" , 1
"fruit" , "Japan" , 3
"vegetable" , "Japan" , 3
我的代码开始如下,
rdd_arm = rdd_arm.map(lambda x: x[1])
rdd_arm 包括上面的键:值格式。
我不确定下一步该去哪里。 有人能帮帮我吗?
我自己解决了
我必须创建一个包含多个密钥的密钥,然后相加。
rdd_arm.map( lambda x : x[0] + ", " + x[1] , x[2] ).reduceByKey( lambda a,b : a + b )
以下问题很有用。
How to group by multiple keys in spark?
让我们创建您的 RDD:
In [1]: rdd_arm = sc.parallelize([{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "US" , "key3" : "2" }, {"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }, {"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }])
In [2]: rdd_arm.collect()
Out[2]:
[{'key1': 'fruit', 'key2': 'US', 'key3': '1'},
{'key1': 'fruit', 'key2': 'US', 'key3': '2'},
{'key1': 'vegetable', 'key2': 'US', 'key3': '1'},
{'key1': 'fruit', 'key2': 'Japan', 'key3': '3'},
{'key1': 'vegetable', 'key2': 'Japan', 'key3': '3'}]
首先,您必须创建一个新密钥,它将是 key1
和 key2
对。它的值将是 key3
,所以你想做这样的事情:
In [3]: new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3']))
In [4]: new_rdd.collect()
Out[4]:
[('fruit, US', '1'),
('fruit, US', '2'),
('vegetable, US', '1'),
('fruit, Japan', '3'),
('vegetable, Japan', '3')]
然后,我们要添加重复键的值,只需调用 reduceByKey(),如下所示:
In [5]: new_rdd = new_rdd.reduceByKey(lambda a, b: int(a) + int(b))
In [6]: new_rdd.collect()
Out[6]:
[('fruit, US', 3),
('fruit, Japan', '3'),
('vegetable, US', '1'),
('vegetable, Japan', '3')]
大功告成!
当然也可以是单行的,像这样:
new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])).reduceByKey(lambda a, b: int(a) + int(b))