多台机器上的芹菜任务

Celery tasks on multiple machines

我有一台服务器,其中安装了 RabbitMQ 代理和两个 Celery 消费者(main1.pymain2.py) 都连接到同一个代理。

在第一个消费者(main1.py)中,我实现了一个 Celery Beat,它在特定队列上多次发送不同的任务:

app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
    [
        ('tasks.beat', {'queue': 'print-queue'}),
    ],
)
app.conf.beat_schedule = {
    'beat-every-10-seconds': {
        'task': 'tasks.beat',
        'schedule': 10.0
    },
}

@app.task(name='tasks.beat', bind=True)
def beat(self):
    for i in range(10):
        app.send_task("tasks.print", args=[i], queue="print-queue")

    return None

在第二个消费者(main2.py)中,我实现了上面说的任务:

app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
    [
        ('tasks.print', {'queue': 'print-queue'}),
    ],
)

@app.task(name='tasks.print', bind=True)
def print(self, name):
    return name

当我启动两个 Celery worker 时:

consumer1: celery worker -A main1 -Q print-queue --beat
consumer2: celery worker -A main2 -Q print-queue

我收到这些错误:

[ERROR/MainProcess] Received unregistered task of type 'tasks.print'

第一个消费者

[ERROR/MainProcess] Received unregistered task of type 'tasks.beat'

在第二个消费者上

是否可以在连接到同一代理的不同 Celery 应用程序上拆分任务?

提前致谢!

这是正在发生的事情。你有两个工人 AB 其中一个也恰好是 运行ning celery beat(假设一个是 B)。

  1. celery beat 将 task.beat 提交到队列。所有这一切都是在 rabbit 中使用一些元数据(包括任务名称)排队消息。
  2. 两名工作人员中的一位阅读了邮件。 A 和 B 都在监听同一个队列,因此任何一个都可以读取它。

    一个。如果 A 读取消息,它将尝试查找名为 tasks.beat 的任务,这会爆炸,因为 A 没有定义该任务。

    b。如果 B 读取消息,它将成功地尝试找到名为 tasks.beat 的任务(因为它 确实 有该任务)并且将 运行 代码。 tasks.beat 将在 rabbit 中加入一条包含 tasks.print.

  3. 元数据的新消息
  4. 同样的问题会再次出现,因为只有 A 和 B 中的一个定义了 tasks.print,但其中任何一个都可能收到消息。

实际上,celery 可能会做一些检查以更早地抛出错误消息,但我相当确定这是潜在的问题。

简而言之,队列中的所有工作人员(包括节拍)都应该运行使用相同的代码。