将现有的芹菜工人用于 Airflow 的 Celeryexecutor 工人

Use existing celery workers for Airflow's Celeryexecutor workers

我正在尝试将动态工作流引入到我的环境中,它涉及不同模型推理的多个步骤,其中一个模型的输出被馈送到另一个模型 model.Currently 我们几乎没有跨主机分布的 Celery worker 来管理推理链。随着复杂性的增加,我们正在尝试动态构建工作流。为此,我使用 Celeryexecutor 进行了动态 DAG 设置。现在,有没有一种方法可以保留当前的 ​​Celery 设置并将气流驱动的任务路由给相同的工作人员?我确实了解这些工作人员中的设置应该可以访问与气流服务器相同的 DAG 文件夹和环境。我想知道如何在这些服务器中启动 celery worker,以便气流可以路由以前由 python 应用程序的手动工作流完成的相同任务。如果我使用命令“airflow celery worker”启动工作人员,我将无法访问我的应用程序任务。如果我按照目前的方式启动芹菜,即“celery -A proj”,气流与它无关。寻找让它发挥作用的想法。

感谢@DejanLekic。我得到了它的工作(尽管 DAG 任务调度延迟太多,我放弃了这种方法)。如果有人想看看这是如何完成的,这里有一些我为让它工作所做的事情。

  1. 更改 airflow.cfg 以更改执行程序、队列和结果 back-end 设置(显而易见)
  2. 如果我们必须使用在气流伞外生成的 Celery worker,请将 celery_app_name 设置更改为 celery.execute 而不是 airflow.executors.celery_execute 并将 Executor 更改为“LocalExecutor”。我没有对此进行测试,但甚至可以通过在项目的芹菜应用程序中注册气流的任务来避免切换到芹菜执行器。
  3. 每个任务现在将调用 send_task(),然后返回的 AsynResult object 存储在 Xcom(隐式或显式)或 Redis 中(隐式推送到队列)和 child 任务然后会收集 Asyncresult(这将是从 Xcom 或 Redis 获取值的隐式调用),然后调用 .get() 以获取上一步的结果。

注意:没有必要在 DAG 的两个任务之间拆分 send_task() 和 .get()。通过在 parent 和 child 之间拆分它们,我试图利用任务之间的延迟。但就我而言,任务的 celery 执行速度比气流在调度相关任务中的固有延迟更快。