芹菜。 运行 单个celery beat + 多个celery worker scale

Celery. Running single celery beat + multiple celery workers scale

单芹菜击败运行宁:

celery -A app:celery beat --loglevel=DEBUG

和三名工人 运行宁:

celery -A app:celery worker -E --loglevel=ERROR -n n1
celery -A app:celery worker -E --loglevel=ERROR -n n2
celery -A app:celery worker -E --loglevel=ERROR -n n3

同一个 Redis 数据库用作所有 worker 和 beat 的消息代理。 所有工作人员都在同一台机器上开始用于开发目的,而他们将在生产中使用不同的 Kubernetes pods 进行部署。使用多个 worker 在不同的 Kube pods 之间分配 50-150 个任务的主要思想每个 运行ning 在 4-8 核机器上。 我们预计 pod 的 none 将承担比他拥有的核心更多的任务,直到存在任何任务少于可用核心的工作人员,以便同时执行最大数量的任务。

所以我无法在本地进行测试。 这里是本地beat触发三个任务:

[2021-08-23 21:35:32,700: DEBUG/MainProcess] Current schedule:
<ScheduleEntry: task-5872-accrual Task5872Accrual() <crontab: 36 21 * * * (m/h/d/dM/MY)>
<ScheduleEntry: task-5872-accrual2 Task5872Accrual2() <crontab: 37 21 * * * (m/h/d/dM/MY)>
<ScheduleEntry: task-5872-accrual3 Task5872Accrual3() <crontab: 38 21 * * * (m/h/d/dM/MY)>
[2021-08-23 21:35:32,700: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2021-08-23 21:35:32,701: DEBUG/MainProcess] beat: Waking up in 27.29 seconds.
[2021-08-23 21:36:00,017: DEBUG/MainProcess] beat: Synchronizing schedule...
[2021-08-23 21:36:00,026: INFO/MainProcess] Scheduler: Sending due task task-5872-accrual (Task5872Accrual)
[2021-08-23 21:36:00,035: DEBUG/MainProcess] Task5872Accrual sent. id->96e671f8-bd07-4c36-a595-b963659bee5c
[2021-08-23 21:36:00,035: DEBUG/MainProcess] beat: Waking up in 59.95 seconds.
[2021-08-23 21:37:00,041: INFO/MainProcess] Scheduler: Sending due task task-5872-accrual2 (Task5872Accrual2)
[2021-08-23 21:37:00,043: DEBUG/MainProcess] Task5872Accrual2 sent. id->532eac4d-1d10-4117-9d7e-16b3f1ae7aee
[2021-08-23 21:37:00,043: DEBUG/MainProcess] beat: Waking up in 59.95 seconds.
[2021-08-23 21:38:00,027: INFO/MainProcess] Scheduler: Sending due task task-5872-accrual3 (Task5872Accrual3)
[2021-08-23 21:38:00,029: DEBUG/MainProcess] Task5872Accrual3 sent. id->68729b64-807d-4e13-8147-0b372ce536af
[2021-08-23 21:38:00,029: DEBUG/MainProcess] beat: Waking up in 5.00 minutes.

我希望每个 worker 将执行单个任务来优化工作人员之间的负载,但不幸的是,这里是如何分配它们的:

所以我不确定不同的 worker 之间是如何同步的,以便在它们之间顺利地分配负载?如果不能,我能以某种方式实现吗?尝试在 Google 中搜索,但在单个 worker 中主要是关于任务之间的并发性,但是如果我需要 运行 比 Kube claster 中的单台机器同时执行更多任务怎么办?

你应该做两件事来实现你想要的:

  • 运行 名具有 -O fair 选项的工人。示例:celery -A app:celery worker -E --loglevel=ERROR -n n1 -O fair
  • 在您的配置中使用 worker_prefetch_multiplier=1 让工作人员尽可能少地进行预取。