如何在 pyspark 中压缩多个 RDD?

How to zip multiple RDDs in pyspark?

在spark中有zipPartitions将多个RDD合并为一个。但是,pyspark RDD 没有这样的方法。如果我多次使用 zip 那么我会为每个合并的 rdd 创建一个新的数据帧,这不是我想要的。

如何在 pyspark 中将多个 RDD 压缩为一个?

好问题。在 PySpark 中引入 zipPartitionsproposed in 2016, but as you can read among comments, they never managed to find a good compromise between performances and solution complexity. The issue is now closed but I do not think it will be reopened in the near future. This 是 Joseph E. Gonzalez 提出的解决方案。


使用那个API最快的方法是自己写(性能当然不会那么好)。一个非常天真的 zipPartitions 实现是:

def zipPartitions(rdd1, rdd2, func):
    rdd1_numPartitions = rdd1.getNumPartitions()
    rdd2_numPartitions = rdd2.getNumPartitions()
    assert rdd1_numPartitions == rdd2_numPartitions, "rdd1 and rdd2 must have the same number of partitions"
    
    paired_rdd1 = rdd1.mapPartitionsWithIndex(lambda index, it: ((index, list(it)),))
    paired_rdd2 = rdd2.mapPartitionsWithIndex(lambda index, it: ((index, list(it)),))
    
    zipped_rdds = paired_rdd1.join(paired_rdd2, numPartitions=rdd1_numPartitions)\
        .flatMap(lambda x: func(x[1][0], x[1][1]))
    
    return zipped_rdds

您可以使用以下方法进行测试:

rdd1 = sc.parallelize(range(30), 3)
rdd2 = sc.parallelize(range(50), 3)

zipPartitions(rdd1, rdd2, lambda it1, it2: itertools.zip_longest(it1, it2))\
    .glom().collect()

参数很容易理解,它们按顺序是第一个 rdd,第二个 rdd 和一个接受 2 个分区迭代器的函数,每个 rdd 一个。 使用 assert rdd1_numPartitions == rdd2_numPartitions 我确保两个 rdd 具有相同数量的分区,这也是 Scala 版本的先决条件。 然后我在两个rdds上使用mapPartitionsWithIndex来转换,例如,一个有两个分区的rdd,from:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

[(0, [0, 1, 2, 3, 4]), (1, [5, 6, 7, 8, 9])]

注意:不幸的是,从itlist(it)的转换是必要的,因为在大多数python实现中你不能pickle生成器, it 参数是一个生成器。有一个例外允许您将 it 转换为列表,这是 pyspark 使用 very clever optimization 处理的情况,我说的是从 range() 创建的 rdd。事实上,考虑到前面的例子,

range(10)

变成

[(0, range(0, 5)), (1, range(5, 10))]

接下来我可以 join 分区索引上的两个新 rdds。 numPartitions可以很容易的预测出来,因为我们之前断言两个rdd的分区数一定是一样的,所以是一对一的关系。最后,我可以应用传递的函数并展平分区结果列表。