Spark:控制分区以减少随机播放

Spark : Control partitioning to reduce shuffle

我正在努力思考 partitioning Spark 中数据帧的不同方式,以减少特定管道上的改组数量。

这是我正在处理的数据框,它包含超过 40 亿行和 80 列:

+-----+-------------------+-----------+
|  msn|          timestamp| Flight_Id |
+-----+-------------------+-----------+
|50020|2020-08-22 19:16:00|       72.0|
|50020|2020-08-22 19:15:00|       84.0|
|50020|2020-08-22 19:14:00|       96.0|
|50020|2020-08-22 19:13:00|       84.0|
|50020|2020-08-22 19:12:00|       84.0|
|50020|2020-08-22 19:11:00|       84.0|
|50020|2020-08-22 19:10:00|       84.0|
|50020|2020-08-22 19:09:00|       84.0|
|50020|2020-08-22 19:08:00|       84.0|
|50020|2020-08-22 19:07:00|       84.0|
|50020|2020-08-22 19:06:00|       84.0|
|50020|2020-08-22 19:05:00|       84.0|
|50020|2020-08-22 19:04:00|       84.0|
|50020|2020-08-22 19:03:00|       84.0|
|50020|2020-08-22 19:02:00|       84.0|
|50020|2020-08-22 19:01:00|       84.0|
|50020|2020-08-22 19:00:00|       84.0|
|50020|2020-08-22 18:59:00|       84.0|
|50020|2020-08-22 18:58:00|       84.0|
|50020|2020-08-22 18:57:00|       84.0|
+-----+-------------------+-----------+

这代表了不同飞机(总共 41 架飞机)的时间序列集合。 我只对这些数据做了两件事:

  1. 使用 MSNFlight_ID 划分的 window 以及 timestamp 划分的 order By 进行过滤以保留每个航班的最后 30 分钟。
  2. 在剩余的列中,计算 meanstdev 并对数据进行归一化。

我有 32 个执行程序,每个执行程序有 12g 内存,作业在 运行 30 小时后崩溃,并显示以下消息:

The driver running the job crashed, ran out of memory, or otherwise became unresponsive while it was running.

查看查询计划我注意到我有超过 300 个步骤,其中超过 60 个涉及改组(所有步骤物理计划看起来完全相同):

AdaptiveSparkPlan(isFinalPlan=false)
+- CollectLimit 1
   +- HashAggregate(keys=[], functions=[avg(3546001_421#213), stddev_samp(3546001_421#213)], output=[avg(3546001_421)#10408, stddev_samp(3546001_421)#10417])
      +- Exchange SinglePartition, true
         +- HashAggregate(keys=[], functions=[partial_avg(3546001_421#213), partial_stddev_samp(3546001_421#213)], output=[sum#10479, count#10480L, n#10423, avg#10424, m2#10425])
            +- Project [3546001_421#213]
               +- Filter (isnotnull(rank#10238) && (rank#10238 <= 1800))
                  +- Window [rank(timestamp#10081) windowspecdefinition(Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
                     +- Sort [Flight_Id_Int#209 ASC NULLS FIRST, timestamp#10081 DESC NULLS LAST], false, 0
                        +- ShuffleQueryStage 0
                           +- Exchange hashpartitioning(Flight_Id_Int#209, 200), true
                              +- Union
                                 :- *(1) Project [Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3546001_421#213]

我有一种强烈的感觉,首先按 msn 进行分区将有助于减少混洗的数量,因为大部分工作都在 msn 级别。

我的问题是 如何在我的代码中我应该重新分区?我应该使用 repartitionrepartition 和密钥 hash partitioning 我一直在阅读关于这个不同的分区程序的文档,我对它们的使用感到困惑,如果这真的是我问题的解决方案.

谢谢。

编辑 1:

这是合理的计划:

GlobalLimit 1
+- LocalLimit 1
   +- Aggregate [avg(3566000_421#214) AS avg(3566000_421)#10594, stddev_samp(3566000_421#214) AS stddev_samp(3566000_421)#10603]
      +- Project [3566000_421#214]
         +- Filter (isnotnull(rank#10238) && (rank#10238 <= 1800))
            +- Window [rank(timestamp#10081) windowspecdefinition(msn#208, Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [msn#208, Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
               +- Union
                  :- Project [msn#208, Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3566000_421#214]

这是我从存储数据的数据湖中提取数据的代码部分。 仅供参考,这是通过自定义 API 使用名为 FoundryTS 的库完成的。重要的是,在我调用 to_dataframe() 方法之前,不会收集任何数据。我循环遍历每个 msn 以避免调用太大,然后我将所有数据帧与 unionByName

合并
# Loop over MSN to extract timeseries
        df = []
        for msn in msn_range:
            search_results = (SeriesMetadata.M_REPORT_NUMBER == report_number) & (SeriesMetadata.M_AIRCRAFT == msn)

            # Create the intervals to split TimeSeries extract by flight for each MSN
            Start_int = list(df1.where(F.col("msn") == msn).select("Start").toPandas()["Start"])
            End_int = list(df1.where(F.col("msn") == msn).select("End").toPandas()["End"])
            flight_id = list(df1.where(F.col("msn") == msn).select("id_cmsReport").toPandas()["id_cmsReport"])

            flights_interval = [Interval(
                start, end, name=flight_Id
                ) for start, end, flight_Id in zip(
                Start_int, End_int, flight_id
                )]

            """ Collect all the series in a node collections """
            output = fts.search.series(
                search_results,
                object_types=["export-control-us-ear99-a220-dal-airline-series"])\
                .map_by(FF.interpolate(
                    before='nearest',
                    internal='nearest',
                    after='nearest',
                    frequency=frequency,
                    rename_columns_by=lambda x: x.metadata["parameter_id"] + "_" + x.metadata["report_number"]),
                    keys='msn') \
                .map_intervals(flights_interval, interval_name='Flight_Id_Int')\
                .map(FF.time_range(period_start, period_end))\
                .to_dataframe()  # !!!!  numPartitions=32  Foundry Doc : #partition = #executors see if it triggers OOM error

            df.append(output)

        output = df[0]
        for df in df[1:]:
            output = output.unionByName(df)  # Same as union but matches name instead of columns order.

        # Repartition by msn to improve latter calculation
        N = len(msn_range)
        output.repartition(N, 'msn')

“驱动程序 运行 作业崩溃,运行 内存不足,或者在 运行 时变得无响应。”

您需要解决的第一个问题是增加驱动程序(不是执行程序)的内存。spark 中的默认驱动程序内存通常很低,以至于会在许多查询中崩溃。

“我的问题是我应该在我的代码中如何以及在何处重新分区”

Spark 已经根据需要完成了添加重新分区的工作。您很可能只会通过在执行中途手动重新分区数据来创建额外的工作。一个潜在的优化是将数据存储在一个分桶的 table 中,但这只会潜在地删除第一个交换,并且只有当你的分桶列与第一个交换的哈希分区完全匹配时。

“查看查询计划我发现我有 300 多个步骤”

你上面描述的并没有走300步。这里似乎有些不对劲。您的优化逻辑计划是什么样的? mean 和 std 应该只需要扫描 -> partitial agg -> exchange -> final agg。在您提供的查询计划中,您似乎故意只查看最后 1600 个数据点而不是最后 30m。您是要执行 window 函数而不是简单的聚合(也称为分组依据)吗?

编辑:

for msn in msn_range:

IMO 这可能是您问题的一部分。此 for 循环导致执行计划非常大,这可能是您在驱动程序上遇到 OOM 问题的原因。您也许可以将它 运行slate 变成更友好的东西,并且不会在驱动程序上做太多工作,将 forloop 转换为 spark.paralellize(...).map( /你的代码/)

对于那些可能有帮助的人,

以下是我在分区时出错的地方:

  1. .to_dataframe() :在我们的云平台Spark中默认创建200个分区。因此,通过遍历 40 msn,我生成了 40 x 200 分区。我最终有很多小任务要管理。
  2. .repartition() :由于我在 msn 上使用了 WindowpartitionBy,我虽然使用 msn 重新分区会加快这一步。但它引入了我的分区的完全洗牌。

结果:根据 Spark Job Tracker 和 > 55k 任务,随机写入 59 GB。任务占用了一些开销,这可以解释驱动程序崩溃的原因。

我做了什么让它起作用:

  1. 我去掉了 Window 函数

通过在我从 DataLake 获取数据之前在过程中进行早期过滤。我直接提取了我需要的部分航班。因此,在完全相同的部分的物理计划中更少 Exchange Partition

这是更新后的实体计划:

AdaptiveSparkPlan(isFinalPlan=false)
+- CollectLimit 1
   +- HashAggregate(keys=[], functions=[avg(3565000_421#213), stddev_samp(3565000_421#213)], output=[avg(3565000_421)#10246, stddev_samp(3565000_421)#10255])
      +- ShuffleQueryStage 0
         +- Exchange SinglePartition, true
            +- *(43) HashAggregate(keys=[], functions=[partial_avg(3565000_421#213), partial_stddev_samp(3565000_421#213)], output=[sum#10317, count#10318L, n#10261, avg#10262, m2#10263])
               +- Union
                  :- *(1) Project [3565000_421#213]
                  :  +- *(1) Scan ExistingRDD[msn#208,Flight_Id_Int#209,Flight_Id_Int.start#210L
  1. 我减少了分区数量:

我在 40 个 msn 的每个 .to_dataframe() 调用中任意将其设置为 5。

24 小时后构建成功。 1.1MB 随机写入和 >27 个任务。

正如@Andrew Long 指出的那样,由于 for 循环,它可能不是最佳选择。我仍在为 32 个执行程序生成至少 200 个分区,最终要管理 > 27k 个任务。

最后:

作为最后一步,我通过依赖底层 API 来获取 1 个大数据帧中的数据并强制分区为 32,从而摆脱了 for 循环。 在撰写本文时构建仍然是 运行,我将使用结果编辑 post。 但确实需要管理的任务更少(减少了 4 倍)。

编辑 1 - 更新

很高兴地报告,通过摆脱 for 循环并将数据帧划分为 64 个分区(32 个执行程序 x 2 个核心),我能够在 11 小时(而不是 24 小时)内完成相同的工作,而 1.9MB仅随机写入和 5k 任务。

PS:我在上面提到了 32 个(而不是 64 个)分区,但是 32 个分区没有成功并且并行度不是最优的(< 20)所以它花费了更长的时间并且我有空闲的执行者. 64 对我来说似乎是最佳选择。