从 Pyspark 中的 RDD 中提取字典

Extracting a dictionary from an RDD in Pyspark

这是一道作业题:

我有一个 RDD 集合 os 元组。我还有 returns 来自每个输入元组的字典的功能。不知何故,reduce函数的opposite.

使用地图,我可以轻松地从 RDD 个元组转换为 RDD 个字典。但是,由于字典是 (key, value) 对的集合,我想将字典的 RDD 转换为包含每个字典内容的 (key, value) 元组的 RDD

这样,如果我的 RDD 包含 10 个元组,那么我得到一个 RDD 包含 10 个字典和 5 个元素(例如),最后我得到一个 RDD 50 个元组。

我假设这必须是 possible 但是,如何呢? (可能是我不知道这个操作英文怎么叫的问题)

我猜你想要的只是一个flatMap:

dicts = sc.parallelize([{"foo": 1, "bar": 2}, {"foo": 3, "baz": -1, "bar": 5}])
dicts.flatMap(lambda x: x.items())

flatMap 从 RDD 的一个元素获取一个函数到可迭代对象,然后连接结果。 Spark 上下文之外的相同类型操作的另一个名称是 mapcat:

>>> from toolz.curried import map, mapcat, concat, pipe
>>> from itertools import repeat
>>> pipe(range(4), mapcat(lambda i: repeat(i, i + 1)), list)
[0, 1, 1, 2, 2, 2, 3, 3, 3, 3]

或一步步来:

>>> pipe(range(4), map(lambda i: repeat(i, i + 1)), concat, list)
[0, 1, 1, 2, 2, 2, 3, 3, 3, 3]

同样的事情使用 itertools.chain

>>> from itertools import chain
>>> pipe((repeat(i, i + 1) for i in  range(4)), chain.from_iterable, list)
>>> [0, 1, 1, 2, 2, 2, 3, 3, 3, 3]

我的 2 美分:

有一个名为 "collectAsMap" 的 PairRDD 函数,returns 来自 RDD 的字典。

举个例子:

sample = someRDD.sample(0, 0.0001, 0)
sample_dict = sample.collectAsMap()
print sample.collect()
print sample_dict

[('hi', 4123.0)]
{'hi': 4123.0}

文档here

希望对您有所帮助! 问候!