PySpark ReduceByKey

PySpark ReduceByKey

我已经尝试让它工作了一段时间,但每次都失败了。我有 2 个文件。其中一个是姓名列表:

Name1
Name2
Name3
Name4

另一个是与几年中每一天的名称关联的值列表:

['0.1,0.2,0.3,0.4',
 '0.5,0.6,0.7,0.8', 
 '10,1000,0.2,5000'
  ...]

目标是得到如下输出:

Name1: [0.1,0.5,10]
Name2: [0.2,0.6,1000]
Name3:[0.3,0.7,0.2]
Name4:[0.4,0.8,5000]

然后为每个绘制直方图。我写了一个映射器,它创建一个元组列表,产生以下输出(这是一个 RDD 对象):

[[('Name1', [0.1]),('Name2', [0,2]),('Name3', [0.3]),('Name4', [0.4])],
[('Name1', [0.5]),('Name2', [0,6]),('Name3', [0.7]),('Name4', [0.8])],
[('Name1', [10]),('Name2', [1000]),('Name3', [0.8]),('Name4', [5000])]]

现在我需要将每个名称的所有值连接到一个列表中,但每个键映射,我尝试 returns 的值都是错误的结果。

您可以简单地遍历每个并使用 dict.setdefault() 从中创建一个字典。示例 -

>>> ll = [[('Name1', [0.1]),('Name2', [0,2]),('Name3', [0.3]),('Name4', [0.4])],
... [('Name1', [0.5]),('Name2', [0,6]),('Name3', [0.7]),('Name4', [0.8])],
... [('Name1', [10]),('Name2', [1000]),('Name3', [0.8]),('Name4', [5000])]]
>>> d = {}
>>> for i in ll:
...     for tup in i:
...             d.setdefault(tup[0],[]).extend(tup[1])
...
>>> pprint.pprint(d)
{'Name1': [0.1, 0.5, 10],
 'Name2': [0, 2, 0, 6, 1000],
 'Name3': [0.3, 0.7, 0.8],
 'Name4': [0.4, 0.8, 5000]}

对于 Pyspark RDD 对象,尝试一个简单的 reduce 函数,例如 -

func = lambda x,y: x+y

然后将其发送到 reduceByKey 方法 -

object.reduceByKey(func)

根据评论,实际上 OP 有一个 RDD 对象列表(不是单个 RDD 对象),在这种情况下,您可以通过调用 .collect() 将 RDD 对象转换为列表,然后执行逻辑,然后你可以决定是否要将结果作为 python 字典或 RDD 对象,如果你想先。您可以调用 dict.items() 获取键值对并调用 sc.parrallelize 。示例 -

d = {}
for i in ll:
    c = i.collect()
    for tup in i:
            d.setdefault(tup[0],[]).extend(tup[1])

rddobj = sc.parallelize(d.items())