如何批量收集RDD中的元素
How to collect elements in a RDD by batches
我有一个 pyspark RDD,它有大约 200 万个元素。我无法一次收集它们,因为它会导致 OutOfMemoryError
异常。
如何批量领取?
这是一个潜在的解决方案,但我怀疑有更好的方法:收集一批(使用 take
、https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.RDD.take.html#pyspark.RDD.take), then remove all elements from the RDD in that batch (using filter
, https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.RDD.filter.html#pyspark.RDD.filter,但我怀疑有更好的方法),重复直到没有元素收藏了。
我不确定这是一个好的解决方案,但您可以使用索引压缩您的 rdd,然后过滤该索引以分批收集项目:
big_rdd = spark.sparkContext.parallelize([str(i) for i in range(0, 100)])
big_rdd_with_index = big_rdd.zipWithIndex()
batch_size = 10
batches = []
for i in range(0, 100, batch_size):
batches.append(big_rdd_with_index.filter(lambda element: i <= element[1] < i + batch_size).map(lambda element: element[0]).collect())
for l in batches:
print(l)
输出:
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
['10', '11', '12', '13', '14', '15', '16', '17', '18', '19']
['20', '21', '22', '23', '24', '25', '26', '27', '28', '29']
['30', '31', '32', '33', '34', '35', '36', '37', '38', '39']
['40', '41', '42', '43', '44', '45', '46', '47', '48', '49']
['50', '51', '52', '53', '54', '55', '56', '57', '58', '59']
['60', '61', '62', '63', '64', '65', '66', '67', '68', '69']
['70', '71', '72', '73', '74', '75', '76', '77', '78', '79']
['80', '81', '82', '83', '84', '85', '86', '87', '88', '89']
['90', '91', '92', '93', '94', '95', '96', '97', '98', '99']
我有一个 pyspark RDD,它有大约 200 万个元素。我无法一次收集它们,因为它会导致 OutOfMemoryError
异常。
如何批量领取?
这是一个潜在的解决方案,但我怀疑有更好的方法:收集一批(使用 take
、https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.RDD.take.html#pyspark.RDD.take), then remove all elements from the RDD in that batch (using filter
, https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.RDD.filter.html#pyspark.RDD.filter,但我怀疑有更好的方法),重复直到没有元素收藏了。
我不确定这是一个好的解决方案,但您可以使用索引压缩您的 rdd,然后过滤该索引以分批收集项目:
big_rdd = spark.sparkContext.parallelize([str(i) for i in range(0, 100)])
big_rdd_with_index = big_rdd.zipWithIndex()
batch_size = 10
batches = []
for i in range(0, 100, batch_size):
batches.append(big_rdd_with_index.filter(lambda element: i <= element[1] < i + batch_size).map(lambda element: element[0]).collect())
for l in batches:
print(l)
输出:
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
['10', '11', '12', '13', '14', '15', '16', '17', '18', '19']
['20', '21', '22', '23', '24', '25', '26', '27', '28', '29']
['30', '31', '32', '33', '34', '35', '36', '37', '38', '39']
['40', '41', '42', '43', '44', '45', '46', '47', '48', '49']
['50', '51', '52', '53', '54', '55', '56', '57', '58', '59']
['60', '61', '62', '63', '64', '65', '66', '67', '68', '69']
['70', '71', '72', '73', '74', '75', '76', '77', '78', '79']
['80', '81', '82', '83', '84', '85', '86', '87', '88', '89']
['90', '91', '92', '93', '94', '95', '96', '97', '98', '99']