PythonOperator 任务在访问 Cloud Storage 时挂起并按 SCHEDULED 堆叠

PythonOperator task hangs accessing Cloud Storage and is stacked as SCHEDULED

我的 DAG 中的一个任务在访问 Cloud Storage 时有时会挂起。代码似乎在此处的 download 函数处停止:

hook = GoogleCloudStorageHook(google_cloud_storage_conn_id='google_cloud_default') for input_file in hook.list(bucket, prefix=folder): hook.download(bucket=bucket, object=input_file)

在我的测试中,该文件夹包含一个 20Mb json 文件。

该任务通常需要 20-30 秒,但在某些情况下它会 运行 5 分钟,之后它的状态更新为 SCHEDULED 并卡在那里(等待超过6个小时)。我怀疑这 5 分钟是由于配置 scheduler_zombie_task_threshold 300 但不确定。

如果我在 Web 上手动清除任务 UI,任务会快速排队并且 运行 再次正确。我通过设置一个 execution_timeout 来解决这个问题,当它需要超过 10 分钟时,它将任务正确更新为 FAILEDUP_FOR_RETRY 状态;但我想解决根本问题以避免依赖固定的超时阈值,有什么建议吗?

Cloud Composer 讨论组对此进行了讨论:https://groups.google.com/d/msg/cloud-composer-discuss/alnKzMjEj8Q/0lbp3bTlAgAJ。这是 Celery 执行器的问题,当 Airflow 工作人员死亡时。

虽然 Composer 正在修复,但如果您希望在当前版本中减少这种情况发生的频率,您可以考虑减少并行度 Airflow 配置或创建具有更大机器类型的新环境。