PySpark ReduceByKey
PySpark ReduceByKey
我已经尝试让它工作了一段时间,但每次都失败了。我有 2 个文件。其中一个是姓名列表:
Name1
Name2
Name3
Name4
另一个是与几年中每一天的名称关联的值列表:
['0.1,0.2,0.3,0.4',
'0.5,0.6,0.7,0.8',
'10,1000,0.2,5000'
...]
目标是得到如下输出:
Name1: [0.1,0.5,10]
Name2: [0.2,0.6,1000]
Name3:[0.3,0.7,0.2]
Name4:[0.4,0.8,5000]
然后为每个绘制直方图。我写了一个映射器,它创建一个元组列表,产生以下输出(这是一个 RDD 对象):
[[('Name1', [0.1]),('Name2', [0,2]),('Name3', [0.3]),('Name4', [0.4])],
[('Name1', [0.5]),('Name2', [0,6]),('Name3', [0.7]),('Name4', [0.8])],
[('Name1', [10]),('Name2', [1000]),('Name3', [0.8]),('Name4', [5000])]]
现在我需要将每个名称的所有值连接到一个列表中,但每个键映射,我尝试 returns 的值都是错误的结果。
您可以简单地遍历每个并使用 dict.setdefault()
从中创建一个字典。示例 -
>>> ll = [[('Name1', [0.1]),('Name2', [0,2]),('Name3', [0.3]),('Name4', [0.4])],
... [('Name1', [0.5]),('Name2', [0,6]),('Name3', [0.7]),('Name4', [0.8])],
... [('Name1', [10]),('Name2', [1000]),('Name3', [0.8]),('Name4', [5000])]]
>>> d = {}
>>> for i in ll:
... for tup in i:
... d.setdefault(tup[0],[]).extend(tup[1])
...
>>> pprint.pprint(d)
{'Name1': [0.1, 0.5, 10],
'Name2': [0, 2, 0, 6, 1000],
'Name3': [0.3, 0.7, 0.8],
'Name4': [0.4, 0.8, 5000]}
对于 Pyspark RDD 对象,尝试一个简单的 reduce 函数,例如 -
func = lambda x,y: x+y
然后将其发送到 reduceByKey
方法 -
object.reduceByKey(func)
根据评论,实际上 OP 有一个 RDD 对象列表(不是单个 RDD 对象),在这种情况下,您可以通过调用 .collect()
将 RDD 对象转换为列表,然后执行逻辑,然后你可以决定是否要将结果作为 python 字典或 RDD 对象,如果你想先。您可以调用 dict.items()
获取键值对并调用 sc.parrallelize
。示例 -
d = {}
for i in ll:
c = i.collect()
for tup in i:
d.setdefault(tup[0],[]).extend(tup[1])
rddobj = sc.parallelize(d.items())
我已经尝试让它工作了一段时间,但每次都失败了。我有 2 个文件。其中一个是姓名列表:
Name1
Name2
Name3
Name4
另一个是与几年中每一天的名称关联的值列表:
['0.1,0.2,0.3,0.4',
'0.5,0.6,0.7,0.8',
'10,1000,0.2,5000'
...]
目标是得到如下输出:
Name1: [0.1,0.5,10]
Name2: [0.2,0.6,1000]
Name3:[0.3,0.7,0.2]
Name4:[0.4,0.8,5000]
然后为每个绘制直方图。我写了一个映射器,它创建一个元组列表,产生以下输出(这是一个 RDD 对象):
[[('Name1', [0.1]),('Name2', [0,2]),('Name3', [0.3]),('Name4', [0.4])],
[('Name1', [0.5]),('Name2', [0,6]),('Name3', [0.7]),('Name4', [0.8])],
[('Name1', [10]),('Name2', [1000]),('Name3', [0.8]),('Name4', [5000])]]
现在我需要将每个名称的所有值连接到一个列表中,但每个键映射,我尝试 returns 的值都是错误的结果。
您可以简单地遍历每个并使用 dict.setdefault()
从中创建一个字典。示例 -
>>> ll = [[('Name1', [0.1]),('Name2', [0,2]),('Name3', [0.3]),('Name4', [0.4])],
... [('Name1', [0.5]),('Name2', [0,6]),('Name3', [0.7]),('Name4', [0.8])],
... [('Name1', [10]),('Name2', [1000]),('Name3', [0.8]),('Name4', [5000])]]
>>> d = {}
>>> for i in ll:
... for tup in i:
... d.setdefault(tup[0],[]).extend(tup[1])
...
>>> pprint.pprint(d)
{'Name1': [0.1, 0.5, 10],
'Name2': [0, 2, 0, 6, 1000],
'Name3': [0.3, 0.7, 0.8],
'Name4': [0.4, 0.8, 5000]}
对于 Pyspark RDD 对象,尝试一个简单的 reduce 函数,例如 -
func = lambda x,y: x+y
然后将其发送到 reduceByKey
方法 -
object.reduceByKey(func)
根据评论,实际上 OP 有一个 RDD 对象列表(不是单个 RDD 对象),在这种情况下,您可以通过调用 .collect()
将 RDD 对象转换为列表,然后执行逻辑,然后你可以决定是否要将结果作为 python 字典或 RDD 对象,如果你想先。您可以调用 dict.items()
获取键值对并调用 sc.parrallelize
。示例 -
d = {}
for i in ll:
c = i.collect()
for tup in i:
d.setdefault(tup[0],[]).extend(tup[1])
rddobj = sc.parallelize(d.items())