我可以将 luigi 与 Python 芹菜一起使用吗

Can i use luigi with Python celery

我正在为我的网络应用程序使用芹菜。 Celery 执行父任务,然后执行进一步的任务管道

芹菜的问题

  1. 我无法使用 luigi 获取依赖关系图和可视化工具来查看父任务的状态

  2. Celery 不提供重新启动失败的管道并从失败的地方开始的机制。

这两个我可以很容易地从 luigi 得到。

所以我在想,一旦 celery 运行父任务,然后在该任务中我执行 Luigi pipleine。

这会不会有任何问题,即我需要根据 queuesize 自动缩放 celery worker。这会影响多台机器上的任何 luigi worker 吗?

从未尝试过,但我认为应该可以在 celery 任务中调用 luigi 任务表单,就像您从 python 一般代码中调用一样:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.CentralPlannerScheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()

关于扩展你的队列和 celery worker:如果你有太多的 celery worker 调用 luigi 任务,当然它会要求你扩展你的 luigi scheduler/daemon 以便它可以处理 API 的数量请求(每次调用要执行的任务时,每隔 N 秒就会点击 luigi 调度程序 API - 这取决于你的配置 - 你的任务将点击调度程序 API 说 "I'm alive",每次任务以 -error 或 success- 完成时,您都会点击调度程序 API,依此类推。

所以,是的,仔细看看你的调度程序,看看它是否收到了太多的 http 请求,或者它的数据库是否成为瓶颈(luigi 默认使用 sqlite,但你可以轻松地将它更改为 mysql o postgres).

更新:

因为 version 2.7.0, luigi.scheduler.CentralPlannerScheduler has been renamed to luigi.scheduler.Scheduler as you may see here 所以上面的代码现在应该是:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.Scheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()