Databricks/python - 创建稳健的长 运行 作业的最佳实践方法是什么
Databricks/python - what is a best practice approach to create a robust long running job
我找不到关于如何创建具有中等失败可能性的工作的很好的概述。
我是一名经验丰富的开发人员,但我对 databricks/spark 比较陌生。虽然我可以通过编程解决问题,但我正在寻找最佳实践解决方案。
我的场景是从网络中读取大量行 API。这项工作大约需要 36 个小时才能 运行。在这 36 小时内,我在与 API 交互时很有可能会遇到致命错误(超时、读取时断开连接、invalid/unexpected return 值等)。虽然我可以越来越多地使我的工作对这些错误更加健壮,但理想情况下,我不必再次 运行 整个工作来恢复。理想情况下,我只需要 运行 失败的案例。
我的基本流程是这样的:
- 读入一组精选的 ID(100 的数千个)
- 对于每个 ID,调用网络 API 以获取详细信息
- 将结果输出写入新的table(ID + 详细信息)
我评估过的方法:
- 尝试捕获一揽子模式中的所有错误并将失败输出到结果 table 中。然后恢复是在修补导致失败的任何内容后读取失败的行作为 ID 的来源。
- 将初始数据集划分为多个文件,并拼凑出一些可以在各个分区上安排工作的东西。如果其中一项失败,则重新运行 单个分区。全部成功后,聚合结果。我认为这是可行的,但由于我对数据块的了解有限,它看起来很乱。我会做我自己的分区和任务调度。我希望有更好的方法。
我脑海中想象的解决方案是这样的:
# Split the source table into 100 equal buckets
# Run only buckets 10,20,21 (presumably, those are the failed buckets)
# For each bucket, run the udf get_details
# If the bucket succeeds, put it's rows into aggregate_df. Otherwise, into error_df
aggregate_df, error_df = df.split_table_evenly(bucket_count=100)
.options(continue_on_task_failure=true)
.filter(bucket=[10,20,21])
.run_task_on_bucket(udf=get_details)
解决这个问题的方法是使用 Spark Structured Streaming,它支持流式查询的检查点。 Spark 指南非常详尽地描述了如何在不同场景中使用结构化流。它没有明确涵盖我的用例 - 按行分解数据集 - 我将在这里描述它。
基本方法是将流分解为 Spark 术语 'micro-batches'。对于上面的场景,要理解的重要一点是,spark batches by time 而我想按 rows 进行批处理。 Spark 有一个提供程序 kind-of 支持按数据进行批处理 - Kafka 提供程序可以基于偏移量进行批处理。因为我不想运行我的数据通过Kafka,所以我选择不使用这种方式。
文件源确实有一个我们可以使用的工具:它能够使用 maxFilesPerTrigger
选项设置 。我也使用 latestFirst
先处理最旧的文件,但这不是必需的。
source_dataframe = self.sparksession.readStream.option('maxFilesPerTrigger', 1) \
.option('latestFirst', True) \
.format('delta') \
.load(path)
因为这只适用于整个文件,所以我需要在生成时限制文件的大小。为此,我只是使用可生成舒适存储桶大小的密钥对我的数据集进行分区。
因为拥有很多文件对于大多数用途来说并不是非常高效,所以我选择将这个数据集写入两次,所以除非我正在检查点,否则不要支付读取许多小文件的成本。但是,这完全是可选的。
dataframe = # Some query
curated_output_dataframe = dataframe
generate_job_input_dataframe = dataframe.partitionBy('somecolumn')
curated_output_dataframe.write.format(my_format).save(path=my_curated_output_path)
generate_job_data_dataframe.write.format(my_format).save(path=my_job_data_path)
然后,您可以使用流函数读取和写入数据集,您几乎可以开始了。
source_dataframe = self.sparksession.readStream.option('maxFilesPerTrigger', 1).format(input_path).load(input_path)
query = dataframe.writeStream.format(output_path).start('output_path')
query.awaitTermination()
但是,在以稳健的方式执行此操作之前,还有一些事情需要注意。
如果你想恢复你的工作,你需要设置一个检查点位置。
sparkSession.readStream.option('checkpointLocation','/_checkpoints/some_unique_directory')
如果您不更改某些设置,Spark 将贪婪地读取所有输入文件。
有关详细信息,请参阅 。
您应该估算您的费率并将起始目标费率设置为该费率。我一开始就把我的设置得很低——火花会从这个速度调整到它可持续的任何东西。
sparksession.conf.set('spark.streaming.backpressure.enabled', True)
sparksession.conf.set('spark.streaming.backpressure.initialRate', target_rate_per_second)
sparksession.conf.set('spark.streaming.backpressure.rateEstimator', 'pid')
sparksession.conf.set('spark.streaming.backpressure.pid.minRate', 1)
除非您监视作业,否则 Spark 将永远 运行。 Spark 无限期地承担流作业 运行,并且没有明确支持 运行ning 直到数据集耗尽。您必须自己编写代码,不幸的是代码很脆弱,因为在某些情况下您必须检查作业状态消息。
有关详细信息,请参阅 。答案中的代码并非 100% 对我有用——我在消息表达式中发现了更多情况。这是我当前使用的代码(删除了日志记录和注释):
while query.isActive:
msg = query.status['message']
data_avail = query.status['isDataAvailable']
trigger_active = query.status['isTriggerActive']
if not data_avail and not trigger_active:
if 'Initializing' not in msg:
query.stop()
time.sleep(poll_interval_seconds)
其他注意事项这些不是检查点所必需的micro-batches,但在其他方面很有用。
Spark 生成有用的日志 - 这些是我在建立对流式执行的信任时发现有用的日志:
22/05/18 23:22:32 INFO MicroBatchExecution: Streaming query made progress:
22/05/18 23:24:44 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 500 milliseconds, but spent 5593 milliseconds
可以通过
启用 PID 估算器的日志记录
sparksession.conf.set('log4j.logger.org.apache.spark.streaming.scheduler.rate.PIDRateEstimator', 'TRACE')
Spark 有 recently added RocksDB support for streaming state management,您需要明确启用它。它对我来说很顺利。
sparksession.conf.set('spark.sql.streaming.stateStore.providerClass', \
'org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider')
我找不到关于如何创建具有中等失败可能性的工作的很好的概述。
我是一名经验丰富的开发人员,但我对 databricks/spark 比较陌生。虽然我可以通过编程解决问题,但我正在寻找最佳实践解决方案。
我的场景是从网络中读取大量行 API。这项工作大约需要 36 个小时才能 运行。在这 36 小时内,我在与 API 交互时很有可能会遇到致命错误(超时、读取时断开连接、invalid/unexpected return 值等)。虽然我可以越来越多地使我的工作对这些错误更加健壮,但理想情况下,我不必再次 运行 整个工作来恢复。理想情况下,我只需要 运行 失败的案例。
我的基本流程是这样的:
- 读入一组精选的 ID(100 的数千个)
- 对于每个 ID,调用网络 API 以获取详细信息
- 将结果输出写入新的table(ID + 详细信息)
我评估过的方法:
- 尝试捕获一揽子模式中的所有错误并将失败输出到结果 table 中。然后恢复是在修补导致失败的任何内容后读取失败的行作为 ID 的来源。
- 将初始数据集划分为多个文件,并拼凑出一些可以在各个分区上安排工作的东西。如果其中一项失败,则重新运行 单个分区。全部成功后,聚合结果。我认为这是可行的,但由于我对数据块的了解有限,它看起来很乱。我会做我自己的分区和任务调度。我希望有更好的方法。
我脑海中想象的解决方案是这样的:
# Split the source table into 100 equal buckets
# Run only buckets 10,20,21 (presumably, those are the failed buckets)
# For each bucket, run the udf get_details
# If the bucket succeeds, put it's rows into aggregate_df. Otherwise, into error_df
aggregate_df, error_df = df.split_table_evenly(bucket_count=100)
.options(continue_on_task_failure=true)
.filter(bucket=[10,20,21])
.run_task_on_bucket(udf=get_details)
解决这个问题的方法是使用 Spark Structured Streaming,它支持流式查询的检查点。 Spark 指南非常详尽地描述了如何在不同场景中使用结构化流。它没有明确涵盖我的用例 - 按行分解数据集 - 我将在这里描述它。
基本方法是将流分解为 Spark 术语 'micro-batches'。对于上面的场景,要理解的重要一点是,spark batches by time 而我想按 rows 进行批处理。 Spark 有一个提供程序 kind-of 支持按数据进行批处理 - Kafka 提供程序可以基于偏移量进行批处理。因为我不想运行我的数据通过Kafka,所以我选择不使用这种方式。
文件源确实有一个我们可以使用的工具:它能够使用 maxFilesPerTrigger
选项设置 latestFirst
先处理最旧的文件,但这不是必需的。
source_dataframe = self.sparksession.readStream.option('maxFilesPerTrigger', 1) \
.option('latestFirst', True) \
.format('delta') \
.load(path)
因为这只适用于整个文件,所以我需要在生成时限制文件的大小。为此,我只是使用可生成舒适存储桶大小的密钥对我的数据集进行分区。
因为拥有很多文件对于大多数用途来说并不是非常高效,所以我选择将这个数据集写入两次,所以除非我正在检查点,否则不要支付读取许多小文件的成本。但是,这完全是可选的。
dataframe = # Some query
curated_output_dataframe = dataframe
generate_job_input_dataframe = dataframe.partitionBy('somecolumn')
curated_output_dataframe.write.format(my_format).save(path=my_curated_output_path)
generate_job_data_dataframe.write.format(my_format).save(path=my_job_data_path)
然后,您可以使用流函数读取和写入数据集,您几乎可以开始了。
source_dataframe = self.sparksession.readStream.option('maxFilesPerTrigger', 1).format(input_path).load(input_path)
query = dataframe.writeStream.format(output_path).start('output_path')
query.awaitTermination()
但是,在以稳健的方式执行此操作之前,还有一些事情需要注意。
如果你想恢复你的工作,你需要设置一个检查点位置。
sparkSession.readStream.option('checkpointLocation','/_checkpoints/some_unique_directory')
如果您不更改某些设置,Spark 将贪婪地读取所有输入文件。
有关详细信息,请参阅
您应该估算您的费率并将起始目标费率设置为该费率。我一开始就把我的设置得很低——火花会从这个速度调整到它可持续的任何东西。
sparksession.conf.set('spark.streaming.backpressure.enabled', True)
sparksession.conf.set('spark.streaming.backpressure.initialRate', target_rate_per_second)
sparksession.conf.set('spark.streaming.backpressure.rateEstimator', 'pid')
sparksession.conf.set('spark.streaming.backpressure.pid.minRate', 1)
除非您监视作业,否则 Spark 将永远 运行。 Spark 无限期地承担流作业 运行,并且没有明确支持 运行ning 直到数据集耗尽。您必须自己编写代码,不幸的是代码很脆弱,因为在某些情况下您必须检查作业状态消息。
有关详细信息,请参阅
while query.isActive:
msg = query.status['message']
data_avail = query.status['isDataAvailable']
trigger_active = query.status['isTriggerActive']
if not data_avail and not trigger_active:
if 'Initializing' not in msg:
query.stop()
time.sleep(poll_interval_seconds)
其他注意事项这些不是检查点所必需的micro-batches,但在其他方面很有用。
Spark 生成有用的日志 - 这些是我在建立对流式执行的信任时发现有用的日志:
22/05/18 23:22:32 INFO MicroBatchExecution: Streaming query made progress:
22/05/18 23:24:44 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 500 milliseconds, but spent 5593 milliseconds
可以通过
启用 PID 估算器的日志记录sparksession.conf.set('log4j.logger.org.apache.spark.streaming.scheduler.rate.PIDRateEstimator', 'TRACE')
Spark 有 recently added RocksDB support for streaming state management,您需要明确启用它。它对我来说很顺利。
sparksession.conf.set('spark.sql.streaming.stateStore.providerClass', \
'org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider')