火花结构化流应用程序中的死执行者
dead executors in spark structured streaming app
我有一个简单的流式作业,它从 kafka 主题中提取数据并将其推送到 S3。
df2 = parsed_df \
.coalesce(1)\
.writeStream.format("parquet")\
.option("checkpointLocation", "<s3location>")\
.option("path","s3location")\
.partitionBy("dt")\
.outputMode("Append")\
.trigger(processingTime='150 seconds')\
.start()
触发时间为150秒。
我的 spark 配置如下。
"driverMemory": "6G",
"driverCores": 1,
"executorCores": 1,
"executorMemory": "3G",
{
"spark.dynamicAllocation.initialExecutors": "3",
"spark.dynamicAllocation.maxExecutors": "12",
"spark.driver.maxResultSize": "4g",
"spark.sql.session.timeZone":"UTC",
"spark.executor.memoryOverhead": "1g",
"spark.driver.memoryOverhead": "2g",
"spark.dynamicAllocation.enabled": "true",
"spark.rpc.message.maxSize": "1024",
"spark.streaming.receiver.maxRate": "4000",
"spark.port.maxRetries" : "100",
"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:2.4.4"
}
工作 运行 很好。但是当我检查我的 spark UI 时,我看到许多死执行者。
这些死去的执行者还在不断增加。
对于每批 150 秒,我正在处理 3-5k 个事件。
我的问题是:-
- 这是一个有效的场景吗?
- 如果这不是一个有效的场景,那么可能是什么原因?是不是因为动态分配属性设置为true?
是的,启用动态分配时这是一个有效的方案。
在结构化流中,数据以微批处理。如果执行器空闲超时小于微批持续时间,则执行器会不断添加和删除。但是,如果执行程序空闲超时大于批处理持续时间,则永远不会删除执行程序。控制此行为的 属性 是“spark.dynamicAllocation.executorIdleTimeout”,默认值为 60 秒。
因此,如果没有 activity,则 60 秒内将删除执行程序。在您的情况下,由于触发间隔为 150 秒,spark 处理 3-5k 事件的微批次相当快,并且执行程序有可能闲置超过 60 秒,因此被删除。
要更改此行为,请添加新配置“spark.dynamicAllocation.executorIdleTimeout”并将其设置为更高的值(比如 300 秒)。
我有一个简单的流式作业,它从 kafka 主题中提取数据并将其推送到 S3。
df2 = parsed_df \
.coalesce(1)\
.writeStream.format("parquet")\
.option("checkpointLocation", "<s3location>")\
.option("path","s3location")\
.partitionBy("dt")\
.outputMode("Append")\
.trigger(processingTime='150 seconds')\
.start()
触发时间为150秒。 我的 spark 配置如下。
"driverMemory": "6G",
"driverCores": 1,
"executorCores": 1,
"executorMemory": "3G",
{
"spark.dynamicAllocation.initialExecutors": "3",
"spark.dynamicAllocation.maxExecutors": "12",
"spark.driver.maxResultSize": "4g",
"spark.sql.session.timeZone":"UTC",
"spark.executor.memoryOverhead": "1g",
"spark.driver.memoryOverhead": "2g",
"spark.dynamicAllocation.enabled": "true",
"spark.rpc.message.maxSize": "1024",
"spark.streaming.receiver.maxRate": "4000",
"spark.port.maxRetries" : "100",
"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:2.4.4"
}
工作 运行 很好。但是当我检查我的 spark UI 时,我看到许多死执行者。
这些死去的执行者还在不断增加。 对于每批 150 秒,我正在处理 3-5k 个事件。 我的问题是:-
- 这是一个有效的场景吗?
- 如果这不是一个有效的场景,那么可能是什么原因?是不是因为动态分配属性设置为true?
是的,启用动态分配时这是一个有效的方案。
在结构化流中,数据以微批处理。如果执行器空闲超时小于微批持续时间,则执行器会不断添加和删除。但是,如果执行程序空闲超时大于批处理持续时间,则永远不会删除执行程序。控制此行为的 属性 是“spark.dynamicAllocation.executorIdleTimeout”,默认值为 60 秒。
因此,如果没有 activity,则 60 秒内将删除执行程序。在您的情况下,由于触发间隔为 150 秒,spark 处理 3-5k 事件的微批次相当快,并且执行程序有可能闲置超过 60 秒,因此被删除。
要更改此行为,请添加新配置“spark.dynamicAllocation.executorIdleTimeout”并将其设置为更高的值(比如 300 秒)。