文件依赖 DAG 执行

File dependend DAG execution

我构建了一个 DAG,首先通过 SFTPOperator 下载文件,我保存它并使用 PythonOperator 继续访问和处理它。

我对这种方法完全没有问题,直到我开始将我的 celery-worker 从 1 扩展到 2。 现在我 运行 解决了一个文件在两个 worker 中都不可用的问题。

我该如何解决?我是否通过 SFTPHook 下载文件并合并这些任务? 我可以将传播限制在不同的工人身上吗?

亲切的问候, 信条代码

您应该在工作人员之间创建一个共享位置。如果有多个工作人员,您无法保证两个任务将由同一个工作人员运行完成。

AWS EFS 或类似的东西,您可以在其中下载该文件并稍后阅读,就足够了。我不擅长基础架构,所以我无法帮助您了解实现细节,但这是我用于类似问题的解决方案。