PySpark 按值分解键并为 LDA 模型保留重复项

PySpark Exploding Key by Value and Preserve Duplicates for LDA Model

我有一个元组的 RDD。我想分解键值对并保留重复项。本质上是键 x 值。我想将这个数组的 RDD 提供给 LDA 模型。我将在下面说明一些代码,在此先感谢您:

当前状态

>>> rdd4.take(2)
[ [(u'11394071', 1), (u'11052103', 1), (u'11052101', 1)],
[(u'11847272', 2), (u'999999', 1), (u'11847272', 3)] ]

首选州

>>> rdd4.take(2)
[ ['11394071','11052103','11052101'],
['11847272', '11847272','999999','11847272','11847272','11847272'] ]

不在我的脑海里

.map(lambda x: x[0])

但是不要靠近有火花的机器

import itertools

orig_list = rdd4.take(2)

result = [list(itertools.chain.from_iterable([[item] * count for item, count in orig_list_item]))
          for orig_list_item in orig_list]

print result的输出:

[['11394071', '11052103', '11052101'],  
 ['11847272', '11847272', '999999', '11847272', '11847272', '11847272']]

解释:

就是这么叫的(有点复杂)列表理解 - 从右边读:

for orig_list_item in orig_list

很清楚,不是吗?对于您 orig_list 中的每个 orig_list_item(它本身就是一个列表),我们将做同样的事情:

for item, count in orig_list_item

我们将解压其中每一对的各个部分(分别为它们命名 itemcount - 例如,对于第一对,它将是 item == u'11394071'count == 1).

现在我们创建这个项目的列表

[item]

并将重复 count

[item] * count

不幸的是,我们将获得一个列表列表,例如(来自您的第二个更有趣的子列表)

[['11847272', '11847272'], ['999999'], ['11847272', '11847272', '11847272']]

所以我们需要将它们组合(连接)成一个简单列表——导入模数itertools[=30的函数(class方法) =]

itertools.chain.from_iterable()

会成功的。

但是 - 不幸的是 - 这个函数在更高版本的 Python 中(我认为来自 Python 2.7)产生一个 generator,而不是 list - 因此我们最终将使用内置函数 list() 来获取所需的列表。

不使用列表理解的解决方案,可能更清楚:

import itertools

orig_list = rdd4.take(2)

result = []
for orig_list_item in orig_list:
    inner_result = []
    for item, count in orig_list_item:
        inner_result.append([item] * count)
    inner_result = itertools.chain.from_iterable(inner_result)
    inner_result = list(inner_result)
    result.append(inner_result)

这个解释实际上和我的另一个答案一样。

使用函数并保留 RDD:

def explode_list(data):
    inner_result = []
    for item, count in data:
        inner_result.append([item] * count)
    flatten_result = [xx for yy in inner_result for xx in yy]
    return flatten_result