过滤然后合并时避免重新分区成本
Avoid repartition costs when filtering and then coalescing
我正在 pyspark 中对 (x,y) 点的 RDD 实施范围查询。我将 xy space 划分为 16*16 网格(256 个单元格),并将我的 RDD 中的每个点分配给这些单元格之一。
gridMappedRDD 是一个 PairRDD:(cell_id, Point object)
我将这个 RDD 划分为 256 个分区,使用:
gridMappedRDD.partitionBy(256)
范围查询是一个矩形框。我有一个用于我的 Grid 对象的方法,它可以 return 与查询范围重叠的单元格 ID 列表。所以,我用它作为过滤器来修剪不相关的单元格:
filteredRDD = gridMappedRDD.filter(lambda x: x[0] in candidateCells)
但是问题是,当运行查询然后收集结果的时候,所有的256个分区都被求值了;为每个分区创建一个任务。
为了避免这个问题,我尝试将 filteredRDD 合并到 candidateCell 列表的长度,我希望这可以解决问题。
filteredRDD.coalesce(len(candidateCells))
事实上,生成的 RDD 有 len(candidateCells)
个分区,但分区与 gridMappedRDD
不同。
如 coalesce 文档中所述,shuffle
参数为 False,不应在分区之间执行随机播放,但我可以看到(在 glom() 的帮助下)情况并非如此。
例如,在 coalesce(4)
和 candidateCells=[62, 63, 78, 79]
之后,分区是这样的:
[[(62, P), (62, P) .... , (63, P)],
[(78, P), (78, P) .... , (79, P)],
[], []
]
实际上,通过合并,我有一个随机读取,它等于每个任务的整个数据集的大小,这需要很长时间。我需要的是一个 RDD,其中只有与 candidateCells 中的单元格相关的分区,没有任何洗牌。
所以,我的问题是,是否可以在不重新洗牌的情况下仅过滤某些分区?对于上面的示例,我的 filteredRDD 将有 4 个分区,其数据与原始 RDD 的第 62、63、78、79 个分区完全相同。这样做,查询可以定向到仅影响分区。
你在这里做出了一些错误的假设:
- 随机播放与
coalesce
无关(coalesce
在这里也没有用)。这是由 partitionBy
引起的。根据定义分区需要随机播放。
- 分区不能用于优化
filter
。 Spark 对您使用的函数一无所知(它是一个黑盒子)。
- 分区不会唯一地将键映射到分区。多个键可以放在同一个分区上 -
你能做什么:
如果生成的子集是小的重新分区并为每个键应用 lookup
:
from itertools import chain
partitionedRDD = gridMappedRDD.partitionBy(256)
chain.from_iterable(
((c, x) for x in partitionedRDD.lookup(c))
for c in candidateCells
)
如果数据量大可以尝试跳过扫描分区(任务数量不会改变,但可以缩短部分任务):
candidatePartitions = [
partitionedRDD.partitioner.partitionFunc(c) for c in candidateCells
]
partitionedRDD.mapPartitionsWithIndex(
lambda i, xs: (x for x in xs if x[0] in candidateCells) if i in candidatePartitions else []
)
这两种方法只有在执行多次时才有意义"lookups"。如果是一次性操作,最好进行线性过滤:
- 它比洗牌和重新分区便宜。
- 如果初始数据均匀分布,下游处理将能够更好地利用可用资源。
我正在 pyspark 中对 (x,y) 点的 RDD 实施范围查询。我将 xy space 划分为 16*16 网格(256 个单元格),并将我的 RDD 中的每个点分配给这些单元格之一。
gridMappedRDD 是一个 PairRDD:(cell_id, Point object)
我将这个 RDD 划分为 256 个分区,使用:
gridMappedRDD.partitionBy(256)
范围查询是一个矩形框。我有一个用于我的 Grid 对象的方法,它可以 return 与查询范围重叠的单元格 ID 列表。所以,我用它作为过滤器来修剪不相关的单元格:
filteredRDD = gridMappedRDD.filter(lambda x: x[0] in candidateCells)
但是问题是,当运行查询然后收集结果的时候,所有的256个分区都被求值了;为每个分区创建一个任务。
为了避免这个问题,我尝试将 filteredRDD 合并到 candidateCell 列表的长度,我希望这可以解决问题。
filteredRDD.coalesce(len(candidateCells))
事实上,生成的 RDD 有 len(candidateCells)
个分区,但分区与 gridMappedRDD
不同。
如 coalesce 文档中所述,shuffle
参数为 False,不应在分区之间执行随机播放,但我可以看到(在 glom() 的帮助下)情况并非如此。
例如,在 coalesce(4)
和 candidateCells=[62, 63, 78, 79]
之后,分区是这样的:
[[(62, P), (62, P) .... , (63, P)],
[(78, P), (78, P) .... , (79, P)],
[], []
]
实际上,通过合并,我有一个随机读取,它等于每个任务的整个数据集的大小,这需要很长时间。我需要的是一个 RDD,其中只有与 candidateCells 中的单元格相关的分区,没有任何洗牌。 所以,我的问题是,是否可以在不重新洗牌的情况下仅过滤某些分区?对于上面的示例,我的 filteredRDD 将有 4 个分区,其数据与原始 RDD 的第 62、63、78、79 个分区完全相同。这样做,查询可以定向到仅影响分区。
你在这里做出了一些错误的假设:
- 随机播放与
coalesce
无关(coalesce
在这里也没有用)。这是由partitionBy
引起的。根据定义分区需要随机播放。 - 分区不能用于优化
filter
。 Spark 对您使用的函数一无所知(它是一个黑盒子)。 - 分区不会唯一地将键映射到分区。多个键可以放在同一个分区上 -
你能做什么:
如果生成的子集是小的重新分区并为每个键应用
lookup
:from itertools import chain partitionedRDD = gridMappedRDD.partitionBy(256) chain.from_iterable( ((c, x) for x in partitionedRDD.lookup(c)) for c in candidateCells )
如果数据量大可以尝试跳过扫描分区(任务数量不会改变,但可以缩短部分任务):
candidatePartitions = [ partitionedRDD.partitioner.partitionFunc(c) for c in candidateCells ] partitionedRDD.mapPartitionsWithIndex( lambda i, xs: (x for x in xs if x[0] in candidateCells) if i in candidatePartitions else [] )
这两种方法只有在执行多次时才有意义"lookups"。如果是一次性操作,最好进行线性过滤:
- 它比洗牌和重新分区便宜。
- 如果初始数据均匀分布,下游处理将能够更好地利用可用资源。