generate/insert Spark RDD 中元素的连续数字
generate/insert contiguous numbers to elements in a Spark RDD
假设我加载了一个 RDD
lines = sc.textFile('/test.txt')
而 RDD 就像 ['apple', 'orange', 'banana']
。然后我想生成RDD [(0, 'apple'), (1, 'orange'), (2, 'banana')]
.
我知道 indexed_lines = lines.zipWithIndex().map(lambda (x, y): ','.join([str(y), x])).collect()
可以做到
但是现在我有另一个RDDnew_lines = ['pineapple','blueberry']
,我想union
这两个RDD(indexed_lines和new_lines)构造[(0, 'apple'), (1, 'orange'), (2, 'banana'), (3, 'pineapple'), (4, 'blueberry')]
通知indexed_lines 已经存在,我不想更改其中的数据。
我正在尝试 zip and union
RDD
index = sc.parallelize(range(3, 5))
new_indexed_lines = new_lines.zip(index)
但它在这个 zip
转换中中断了。
知道为什么它坏了,是否有更聪明的方法来做到这一点?
谢谢。
这样的怎么样?
offset = lines.count()
new_indexed_lines = (new_lines
.zipWithIndex()
.map(lambda xi: (xi[1] + offset, xi[0])))
假设我加载了一个 RDD
lines = sc.textFile('/test.txt')
而 RDD 就像 ['apple', 'orange', 'banana']
。然后我想生成RDD [(0, 'apple'), (1, 'orange'), (2, 'banana')]
.
我知道 indexed_lines = lines.zipWithIndex().map(lambda (x, y): ','.join([str(y), x])).collect()
但是现在我有另一个RDDnew_lines = ['pineapple','blueberry']
,我想union
这两个RDD(indexed_lines和new_lines)构造[(0, 'apple'), (1, 'orange'), (2, 'banana'), (3, 'pineapple'), (4, 'blueberry')]
通知indexed_lines 已经存在,我不想更改其中的数据。
我正在尝试 zip and union
RDD
index = sc.parallelize(range(3, 5))
new_indexed_lines = new_lines.zip(index)
但它在这个 zip
转换中中断了。
知道为什么它坏了,是否有更聪明的方法来做到这一点?
谢谢。
这样的怎么样?
offset = lines.count()
new_indexed_lines = (new_lines
.zipWithIndex()
.map(lambda xi: (xi[1] + offset, xi[0])))