Spark:控制分区以减少随机播放
Spark : Control partitioning to reduce shuffle
我正在努力思考 partitioning
Spark 中数据帧的不同方式,以减少特定管道上的改组数量。
这是我正在处理的数据框,它包含超过 40 亿行和 80 列:
+-----+-------------------+-----------+
| msn| timestamp| Flight_Id |
+-----+-------------------+-----------+
|50020|2020-08-22 19:16:00| 72.0|
|50020|2020-08-22 19:15:00| 84.0|
|50020|2020-08-22 19:14:00| 96.0|
|50020|2020-08-22 19:13:00| 84.0|
|50020|2020-08-22 19:12:00| 84.0|
|50020|2020-08-22 19:11:00| 84.0|
|50020|2020-08-22 19:10:00| 84.0|
|50020|2020-08-22 19:09:00| 84.0|
|50020|2020-08-22 19:08:00| 84.0|
|50020|2020-08-22 19:07:00| 84.0|
|50020|2020-08-22 19:06:00| 84.0|
|50020|2020-08-22 19:05:00| 84.0|
|50020|2020-08-22 19:04:00| 84.0|
|50020|2020-08-22 19:03:00| 84.0|
|50020|2020-08-22 19:02:00| 84.0|
|50020|2020-08-22 19:01:00| 84.0|
|50020|2020-08-22 19:00:00| 84.0|
|50020|2020-08-22 18:59:00| 84.0|
|50020|2020-08-22 18:58:00| 84.0|
|50020|2020-08-22 18:57:00| 84.0|
+-----+-------------------+-----------+
这代表了不同飞机(总共 41 架飞机)的时间序列集合。
我只对这些数据做了两件事:
- 使用
MSN
和 Flight_ID
划分的 window 以及 timestamp
划分的 order By
进行过滤以保留每个航班的最后 30 分钟。
- 在剩余的列中,计算
mean
和 stdev
并对数据进行归一化。
我有 32 个执行程序,每个执行程序有 12g 内存,作业在 运行 30 小时后崩溃,并显示以下消息:
The driver running the job crashed, ran out of memory, or otherwise became unresponsive while it was running.
查看查询计划我注意到我有超过 300 个步骤,其中超过 60 个涉及改组(所有步骤物理计划看起来完全相同):
AdaptiveSparkPlan(isFinalPlan=false)
+- CollectLimit 1
+- HashAggregate(keys=[], functions=[avg(3546001_421#213), stddev_samp(3546001_421#213)], output=[avg(3546001_421)#10408, stddev_samp(3546001_421)#10417])
+- Exchange SinglePartition, true
+- HashAggregate(keys=[], functions=[partial_avg(3546001_421#213), partial_stddev_samp(3546001_421#213)], output=[sum#10479, count#10480L, n#10423, avg#10424, m2#10425])
+- Project [3546001_421#213]
+- Filter (isnotnull(rank#10238) && (rank#10238 <= 1800))
+- Window [rank(timestamp#10081) windowspecdefinition(Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
+- Sort [Flight_Id_Int#209 ASC NULLS FIRST, timestamp#10081 DESC NULLS LAST], false, 0
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(Flight_Id_Int#209, 200), true
+- Union
:- *(1) Project [Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3546001_421#213]
我有一种强烈的感觉,首先按 msn
进行分区将有助于减少混洗的数量,因为大部分工作都在 msn
级别。
我的问题是 如何和在我的代码中我应该重新分区?我应该使用 repartition
、repartition
和密钥 hash partitioning
我一直在阅读关于这个不同的分区程序的文档,我对它们的使用感到困惑,如果这真的是我问题的解决方案.
谢谢。
编辑 1:
这是合理的计划:
GlobalLimit 1
+- LocalLimit 1
+- Aggregate [avg(3566000_421#214) AS avg(3566000_421)#10594, stddev_samp(3566000_421#214) AS stddev_samp(3566000_421)#10603]
+- Project [3566000_421#214]
+- Filter (isnotnull(rank#10238) && (rank#10238 <= 1800))
+- Window [rank(timestamp#10081) windowspecdefinition(msn#208, Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [msn#208, Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
+- Union
:- Project [msn#208, Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3566000_421#214]
这是我从存储数据的数据湖中提取数据的代码部分。
仅供参考,这是通过自定义 API 使用名为 FoundryTS
的库完成的。重要的是,在我调用 to_dataframe()
方法之前,不会收集任何数据。我循环遍历每个 msn
以避免调用太大,然后我将所有数据帧与 unionByName
合并
# Loop over MSN to extract timeseries
df = []
for msn in msn_range:
search_results = (SeriesMetadata.M_REPORT_NUMBER == report_number) & (SeriesMetadata.M_AIRCRAFT == msn)
# Create the intervals to split TimeSeries extract by flight for each MSN
Start_int = list(df1.where(F.col("msn") == msn).select("Start").toPandas()["Start"])
End_int = list(df1.where(F.col("msn") == msn).select("End").toPandas()["End"])
flight_id = list(df1.where(F.col("msn") == msn).select("id_cmsReport").toPandas()["id_cmsReport"])
flights_interval = [Interval(
start, end, name=flight_Id
) for start, end, flight_Id in zip(
Start_int, End_int, flight_id
)]
""" Collect all the series in a node collections """
output = fts.search.series(
search_results,
object_types=["export-control-us-ear99-a220-dal-airline-series"])\
.map_by(FF.interpolate(
before='nearest',
internal='nearest',
after='nearest',
frequency=frequency,
rename_columns_by=lambda x: x.metadata["parameter_id"] + "_" + x.metadata["report_number"]),
keys='msn') \
.map_intervals(flights_interval, interval_name='Flight_Id_Int')\
.map(FF.time_range(period_start, period_end))\
.to_dataframe() # !!!! numPartitions=32 Foundry Doc : #partition = #executors see if it triggers OOM error
df.append(output)
output = df[0]
for df in df[1:]:
output = output.unionByName(df) # Same as union but matches name instead of columns order.
# Repartition by msn to improve latter calculation
N = len(msn_range)
output.repartition(N, 'msn')
“驱动程序 运行 作业崩溃,运行 内存不足,或者在 运行 时变得无响应。”
您需要解决的第一个问题是增加驱动程序(不是执行程序)的内存。spark 中的默认驱动程序内存通常很低,以至于会在许多查询中崩溃。
“我的问题是我应该在我的代码中如何以及在何处重新分区”
Spark 已经根据需要完成了添加重新分区的工作。您很可能只会通过在执行中途手动重新分区数据来创建额外的工作。一个潜在的优化是将数据存储在一个分桶的 table 中,但这只会潜在地删除第一个交换,并且只有当你的分桶列与第一个交换的哈希分区完全匹配时。
“查看查询计划我发现我有 300 多个步骤”
你上面描述的并没有走300步。这里似乎有些不对劲。您的优化逻辑计划是什么样的? mean 和 std 应该只需要扫描 -> partitial agg -> exchange -> final agg。在您提供的查询计划中,您似乎故意只查看最后 1600 个数据点而不是最后 30m。您是要执行 window 函数而不是简单的聚合(也称为分组依据)吗?
编辑:
for msn in msn_range:
IMO 这可能是您问题的一部分。此 for 循环导致执行计划非常大,这可能是您在驱动程序上遇到 OOM 问题的原因。您也许可以将它 运行slate 变成更友好的东西,并且不会在驱动程序上做太多工作,将 forloop 转换为 spark.paralellize(...).map( /你的代码/)
对于那些可能有帮助的人,
以下是我在分区时出错的地方:
.to_dataframe()
:在我们的云平台Spark中默认创建200个分区。因此,通过遍历 40 msn
,我生成了 40 x 200 分区。我最终有很多小任务要管理。
.repartition()
:由于我在 msn
上使用了 Window
和 partitionBy
,我虽然使用 msn
重新分区会加快这一步。但它引入了我的分区的完全洗牌。
结果:根据 Spark Job Tracker 和 > 55k 任务,随机写入 59 GB。任务占用了一些开销,这可以解释驱动程序崩溃的原因。
我做了什么让它起作用:
- 我去掉了
Window
函数
通过在我从 DataLake 获取数据之前在过程中进行早期过滤。我直接提取了我需要的部分航班。因此,在完全相同的部分的物理计划中更少 Exchange Partition
。
这是更新后的实体计划:
AdaptiveSparkPlan(isFinalPlan=false)
+- CollectLimit 1
+- HashAggregate(keys=[], functions=[avg(3565000_421#213), stddev_samp(3565000_421#213)], output=[avg(3565000_421)#10246, stddev_samp(3565000_421)#10255])
+- ShuffleQueryStage 0
+- Exchange SinglePartition, true
+- *(43) HashAggregate(keys=[], functions=[partial_avg(3565000_421#213), partial_stddev_samp(3565000_421#213)], output=[sum#10317, count#10318L, n#10261, avg#10262, m2#10263])
+- Union
:- *(1) Project [3565000_421#213]
: +- *(1) Scan ExistingRDD[msn#208,Flight_Id_Int#209,Flight_Id_Int.start#210L
- 我减少了分区数量:
我在 40 个 msn
的每个 .to_dataframe()
调用中任意将其设置为 5。
24 小时后构建成功。 1.1MB 随机写入和 >27 个任务。
正如@Andrew Long 指出的那样,由于 for
循环,它可能不是最佳选择。我仍在为 32 个执行程序生成至少 200 个分区,最终要管理 > 27k 个任务。
最后:
作为最后一步,我通过依赖底层 API 来获取 1 个大数据帧中的数据并强制分区为 32,从而摆脱了 for 循环。
在撰写本文时构建仍然是 运行,我将使用结果编辑 post。
但确实需要管理的任务更少(减少了 4 倍)。
编辑 1 - 更新
很高兴地报告,通过摆脱 for 循环并将数据帧划分为 64 个分区(32 个执行程序 x 2 个核心),我能够在 11 小时(而不是 24 小时)内完成相同的工作,而 1.9MB仅随机写入和 5k 任务。
PS:我在上面提到了 32 个(而不是 64 个)分区,但是 32 个分区没有成功并且并行度不是最优的(< 20)所以它花费了更长的时间并且我有空闲的执行者. 64 对我来说似乎是最佳选择。
我正在努力思考 partitioning
Spark 中数据帧的不同方式,以减少特定管道上的改组数量。
这是我正在处理的数据框,它包含超过 40 亿行和 80 列:
+-----+-------------------+-----------+
| msn| timestamp| Flight_Id |
+-----+-------------------+-----------+
|50020|2020-08-22 19:16:00| 72.0|
|50020|2020-08-22 19:15:00| 84.0|
|50020|2020-08-22 19:14:00| 96.0|
|50020|2020-08-22 19:13:00| 84.0|
|50020|2020-08-22 19:12:00| 84.0|
|50020|2020-08-22 19:11:00| 84.0|
|50020|2020-08-22 19:10:00| 84.0|
|50020|2020-08-22 19:09:00| 84.0|
|50020|2020-08-22 19:08:00| 84.0|
|50020|2020-08-22 19:07:00| 84.0|
|50020|2020-08-22 19:06:00| 84.0|
|50020|2020-08-22 19:05:00| 84.0|
|50020|2020-08-22 19:04:00| 84.0|
|50020|2020-08-22 19:03:00| 84.0|
|50020|2020-08-22 19:02:00| 84.0|
|50020|2020-08-22 19:01:00| 84.0|
|50020|2020-08-22 19:00:00| 84.0|
|50020|2020-08-22 18:59:00| 84.0|
|50020|2020-08-22 18:58:00| 84.0|
|50020|2020-08-22 18:57:00| 84.0|
+-----+-------------------+-----------+
这代表了不同飞机(总共 41 架飞机)的时间序列集合。 我只对这些数据做了两件事:
- 使用
MSN
和Flight_ID
划分的 window 以及timestamp
划分的order By
进行过滤以保留每个航班的最后 30 分钟。 - 在剩余的列中,计算
mean
和stdev
并对数据进行归一化。
我有 32 个执行程序,每个执行程序有 12g 内存,作业在 运行 30 小时后崩溃,并显示以下消息:
The driver running the job crashed, ran out of memory, or otherwise became unresponsive while it was running.
查看查询计划我注意到我有超过 300 个步骤,其中超过 60 个涉及改组(所有步骤物理计划看起来完全相同):
AdaptiveSparkPlan(isFinalPlan=false)
+- CollectLimit 1
+- HashAggregate(keys=[], functions=[avg(3546001_421#213), stddev_samp(3546001_421#213)], output=[avg(3546001_421)#10408, stddev_samp(3546001_421)#10417])
+- Exchange SinglePartition, true
+- HashAggregate(keys=[], functions=[partial_avg(3546001_421#213), partial_stddev_samp(3546001_421#213)], output=[sum#10479, count#10480L, n#10423, avg#10424, m2#10425])
+- Project [3546001_421#213]
+- Filter (isnotnull(rank#10238) && (rank#10238 <= 1800))
+- Window [rank(timestamp#10081) windowspecdefinition(Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
+- Sort [Flight_Id_Int#209 ASC NULLS FIRST, timestamp#10081 DESC NULLS LAST], false, 0
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(Flight_Id_Int#209, 200), true
+- Union
:- *(1) Project [Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3546001_421#213]
我有一种强烈的感觉,首先按 msn
进行分区将有助于减少混洗的数量,因为大部分工作都在 msn
级别。
我的问题是 如何和在我的代码中我应该重新分区?我应该使用 repartition
、repartition
和密钥 hash partitioning
我一直在阅读关于这个不同的分区程序的文档,我对它们的使用感到困惑,如果这真的是我问题的解决方案.
谢谢。
编辑 1:
这是合理的计划:
GlobalLimit 1
+- LocalLimit 1
+- Aggregate [avg(3566000_421#214) AS avg(3566000_421)#10594, stddev_samp(3566000_421#214) AS stddev_samp(3566000_421)#10603]
+- Project [3566000_421#214]
+- Filter (isnotnull(rank#10238) && (rank#10238 <= 1800))
+- Window [rank(timestamp#10081) windowspecdefinition(msn#208, Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [msn#208, Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
+- Union
:- Project [msn#208, Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3566000_421#214]
这是我从存储数据的数据湖中提取数据的代码部分。
仅供参考,这是通过自定义 API 使用名为 FoundryTS
的库完成的。重要的是,在我调用 to_dataframe()
方法之前,不会收集任何数据。我循环遍历每个 msn
以避免调用太大,然后我将所有数据帧与 unionByName
# Loop over MSN to extract timeseries
df = []
for msn in msn_range:
search_results = (SeriesMetadata.M_REPORT_NUMBER == report_number) & (SeriesMetadata.M_AIRCRAFT == msn)
# Create the intervals to split TimeSeries extract by flight for each MSN
Start_int = list(df1.where(F.col("msn") == msn).select("Start").toPandas()["Start"])
End_int = list(df1.where(F.col("msn") == msn).select("End").toPandas()["End"])
flight_id = list(df1.where(F.col("msn") == msn).select("id_cmsReport").toPandas()["id_cmsReport"])
flights_interval = [Interval(
start, end, name=flight_Id
) for start, end, flight_Id in zip(
Start_int, End_int, flight_id
)]
""" Collect all the series in a node collections """
output = fts.search.series(
search_results,
object_types=["export-control-us-ear99-a220-dal-airline-series"])\
.map_by(FF.interpolate(
before='nearest',
internal='nearest',
after='nearest',
frequency=frequency,
rename_columns_by=lambda x: x.metadata["parameter_id"] + "_" + x.metadata["report_number"]),
keys='msn') \
.map_intervals(flights_interval, interval_name='Flight_Id_Int')\
.map(FF.time_range(period_start, period_end))\
.to_dataframe() # !!!! numPartitions=32 Foundry Doc : #partition = #executors see if it triggers OOM error
df.append(output)
output = df[0]
for df in df[1:]:
output = output.unionByName(df) # Same as union but matches name instead of columns order.
# Repartition by msn to improve latter calculation
N = len(msn_range)
output.repartition(N, 'msn')
“驱动程序 运行 作业崩溃,运行 内存不足,或者在 运行 时变得无响应。”
您需要解决的第一个问题是增加驱动程序(不是执行程序)的内存。spark 中的默认驱动程序内存通常很低,以至于会在许多查询中崩溃。
“我的问题是我应该在我的代码中如何以及在何处重新分区”
Spark 已经根据需要完成了添加重新分区的工作。您很可能只会通过在执行中途手动重新分区数据来创建额外的工作。一个潜在的优化是将数据存储在一个分桶的 table 中,但这只会潜在地删除第一个交换,并且只有当你的分桶列与第一个交换的哈希分区完全匹配时。
“查看查询计划我发现我有 300 多个步骤”
你上面描述的并没有走300步。这里似乎有些不对劲。您的优化逻辑计划是什么样的? mean 和 std 应该只需要扫描 -> partitial agg -> exchange -> final agg。在您提供的查询计划中,您似乎故意只查看最后 1600 个数据点而不是最后 30m。您是要执行 window 函数而不是简单的聚合(也称为分组依据)吗?
编辑:
for msn in msn_range:
IMO 这可能是您问题的一部分。此 for 循环导致执行计划非常大,这可能是您在驱动程序上遇到 OOM 问题的原因。您也许可以将它 运行slate 变成更友好的东西,并且不会在驱动程序上做太多工作,将 forloop 转换为 spark.paralellize(...).map( /你的代码/)
对于那些可能有帮助的人,
以下是我在分区时出错的地方:
.to_dataframe()
:在我们的云平台Spark中默认创建200个分区。因此,通过遍历 40msn
,我生成了 40 x 200 分区。我最终有很多小任务要管理。.repartition()
:由于我在msn
上使用了Window
和partitionBy
,我虽然使用msn
重新分区会加快这一步。但它引入了我的分区的完全洗牌。
结果:根据 Spark Job Tracker 和 > 55k 任务,随机写入 59 GB。任务占用了一些开销,这可以解释驱动程序崩溃的原因。
我做了什么让它起作用:
- 我去掉了
Window
函数
通过在我从 DataLake 获取数据之前在过程中进行早期过滤。我直接提取了我需要的部分航班。因此,在完全相同的部分的物理计划中更少 Exchange Partition
。
这是更新后的实体计划:
AdaptiveSparkPlan(isFinalPlan=false)
+- CollectLimit 1
+- HashAggregate(keys=[], functions=[avg(3565000_421#213), stddev_samp(3565000_421#213)], output=[avg(3565000_421)#10246, stddev_samp(3565000_421)#10255])
+- ShuffleQueryStage 0
+- Exchange SinglePartition, true
+- *(43) HashAggregate(keys=[], functions=[partial_avg(3565000_421#213), partial_stddev_samp(3565000_421#213)], output=[sum#10317, count#10318L, n#10261, avg#10262, m2#10263])
+- Union
:- *(1) Project [3565000_421#213]
: +- *(1) Scan ExistingRDD[msn#208,Flight_Id_Int#209,Flight_Id_Int.start#210L
- 我减少了分区数量:
我在 40 个 msn
的每个 .to_dataframe()
调用中任意将其设置为 5。
24 小时后构建成功。 1.1MB 随机写入和 >27 个任务。
正如@Andrew Long 指出的那样,由于 for
循环,它可能不是最佳选择。我仍在为 32 个执行程序生成至少 200 个分区,最终要管理 > 27k 个任务。
最后:
作为最后一步,我通过依赖底层 API 来获取 1 个大数据帧中的数据并强制分区为 32,从而摆脱了 for 循环。 在撰写本文时构建仍然是 运行,我将使用结果编辑 post。 但确实需要管理的任务更少(减少了 4 倍)。
编辑 1 - 更新
很高兴地报告,通过摆脱 for 循环并将数据帧划分为 64 个分区(32 个执行程序 x 2 个核心),我能够在 11 小时(而不是 24 小时)内完成相同的工作,而 1.9MB仅随机写入和 5k 任务。
PS:我在上面提到了 32 个(而不是 64 个)分区,但是 32 个分区没有成功并且并行度不是最优的(< 20)所以它花费了更长的时间并且我有空闲的执行者. 64 对我来说似乎是最佳选择。