过滤然后合并时避免重新分区成本

Avoid repartition costs when filtering and then coalescing

我正在 pyspark 中对 (x,y) 点的 RDD 实施范围查询。我将 xy space 划分为 16*16 网格(256 个单元格),并将我的 RDD 中的每个点分配给这些单元格之一。 gridMappedRDD 是一个 PairRDD:(cell_id, Point object)

我将这个 RDD 划分为 256 个分区,使用:

gridMappedRDD.partitionBy(256)

范围查询是一个矩形框。我有一个用于我的 Grid 对象的方法,它可以 return 与查询范围重叠的单元格 ID 列表。所以,我用它作为过滤器来修剪不相关的单元格:

filteredRDD = gridMappedRDD.filter(lambda x: x[0] in candidateCells)

但是问题是,当运行查询然后收集结果的时候,所有的256个分区都被求值了;为每个分区创建一个任务。

为了避免这个问题,我尝试将 filteredRDD 合并到 candidateCell 列表的长度,我希望这可以解决问题。

filteredRDD.coalesce(len(candidateCells))

事实上,生成的 RDD 有 len(candidateCells) 个分区,但分区与 gridMappedRDD 不同。

如 coalesce 文档中所述,shuffle 参数为 False,不应在分区之间执行随机播放,但我可以看到(在 glom() 的帮助下)情况并非如此。

例如,在 coalesce(4)candidateCells=[62, 63, 78, 79] 之后,分区是这样的:

[[(62, P), (62, P) .... , (63, P)],
 [(78, P), (78, P) .... , (79, P)],
 [], []
]

实际上,通过合并,我有一个随机读取,它等于每个任务的整个数据集的大小,这需要很长时间。我需要的是一个 RDD,其中只有与 candidateCells 中的单元格相关的分区,没有任何洗牌。 所以,我的问题是,是否可以在不重新洗牌的情况下仅过滤某些分区?对于上面的示例,我的 filteredRDD 将有 4 个分区,其数据与原始 RDD 的第 62、63、78、79 个分区完全相同。这样做,查询可以定向到仅影响分区。

你在这里做出了一些错误的假设:

  • 随机播放与coalesce无关(coalesce在这里也没有用)。这是由 partitionBy 引起的。根据定义分区需要随机播放。
  • 分区不能用于优化filter。 Spark 对您使用的函数一无所知(它是一个黑盒子)。
  • 分区不会唯一地将键映射到分区。多个键可以放在同一个分区上 -

你能做什么:

  • 如果生成的子集是小的重新分区并为每个键应用 lookup

    from itertools import chain
    
    partitionedRDD = gridMappedRDD.partitionBy(256)
    
    chain.from_iterable(
        ((c, x) for x in partitionedRDD.lookup(c)) 
        for c in candidateCells
    )
    
  • 如果数据量大可以尝试跳过扫描分区(任务数量不会改变,但可以缩短部分任务):

    candidatePartitions = [
        partitionedRDD.partitioner.partitionFunc(c) for c in candidateCells
    ]
    
    partitionedRDD.mapPartitionsWithIndex(
        lambda i, xs: (x for x in xs if x[0] in candidateCells) if i in candidatePartitions else []
    )
    

这两种方法只有在执行多次时才有意义"lookups"。如果是一次性操作,最好进行线性过滤:

  • 它比洗牌和重新分区便宜。
  • 如果初始数据均匀分布,下游处理将能够更好地利用可用资源。