多台机器上的芹菜任务
Celery tasks on multiple machines
我有一台服务器,其中安装了 RabbitMQ 代理和两个 Celery 消费者(main1.py 和 main2.py) 都连接到同一个代理。
在第一个消费者(main1.py)中,我实现了一个 Celery Beat,它在特定队列上多次发送不同的任务:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.beat', {'queue': 'print-queue'}),
],
)
app.conf.beat_schedule = {
'beat-every-10-seconds': {
'task': 'tasks.beat',
'schedule': 10.0
},
}
@app.task(name='tasks.beat', bind=True)
def beat(self):
for i in range(10):
app.send_task("tasks.print", args=[i], queue="print-queue")
return None
在第二个消费者(main2.py)中,我实现了上面说的任务:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.print', {'queue': 'print-queue'}),
],
)
@app.task(name='tasks.print', bind=True)
def print(self, name):
return name
当我启动两个 Celery worker 时:
consumer1: celery worker -A main1 -Q print-queue --beat
consumer2: celery worker -A main2 -Q print-queue
我收到这些错误:
[ERROR/MainProcess] Received unregistered task of type 'tasks.print'
第一个消费者
[ERROR/MainProcess] Received unregistered task of type 'tasks.beat'
在第二个消费者上
是否可以在连接到同一代理的不同 Celery 应用程序上拆分任务?
提前致谢!
这是正在发生的事情。你有两个工人 A
和 B
其中一个也恰好是 运行ning celery beat(假设一个是 B
)。
- celery beat 将
task.beat
提交到队列。所有这一切都是在 rabbit 中使用一些元数据(包括任务名称)排队消息。
两名工作人员中的一位阅读了邮件。 A 和 B 都在监听同一个队列,因此任何一个都可以读取它。
一个。如果 A 读取消息,它将尝试查找名为 tasks.beat
的任务,这会爆炸,因为 A 没有定义该任务。
b。如果 B 读取消息,它将成功地尝试找到名为 tasks.beat
的任务(因为它 确实 有该任务)并且将 运行 代码。 tasks.beat
将在 rabbit 中加入一条包含 tasks.print
.
元数据的新消息
- 同样的问题会再次出现,因为只有 A 和 B 中的一个定义了
tasks.print
,但其中任何一个都可能收到消息。
实际上,celery 可能会做一些检查以更早地抛出错误消息,但我相当确定这是潜在的问题。
简而言之,队列中的所有工作人员(包括节拍)都应该运行使用相同的代码。
我有一台服务器,其中安装了 RabbitMQ 代理和两个 Celery 消费者(main1.py 和 main2.py) 都连接到同一个代理。
在第一个消费者(main1.py)中,我实现了一个 Celery Beat,它在特定队列上多次发送不同的任务:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.beat', {'queue': 'print-queue'}),
],
)
app.conf.beat_schedule = {
'beat-every-10-seconds': {
'task': 'tasks.beat',
'schedule': 10.0
},
}
@app.task(name='tasks.beat', bind=True)
def beat(self):
for i in range(10):
app.send_task("tasks.print", args=[i], queue="print-queue")
return None
在第二个消费者(main2.py)中,我实现了上面说的任务:
app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.print', {'queue': 'print-queue'}),
],
)
@app.task(name='tasks.print', bind=True)
def print(self, name):
return name
当我启动两个 Celery worker 时:
consumer1: celery worker -A main1 -Q print-queue --beat
consumer2: celery worker -A main2 -Q print-queue
我收到这些错误:
[ERROR/MainProcess] Received unregistered task of type 'tasks.print'
第一个消费者
[ERROR/MainProcess] Received unregistered task of type 'tasks.beat'
在第二个消费者上
是否可以在连接到同一代理的不同 Celery 应用程序上拆分任务?
提前致谢!
这是正在发生的事情。你有两个工人 A
和 B
其中一个也恰好是 运行ning celery beat(假设一个是 B
)。
- celery beat 将
task.beat
提交到队列。所有这一切都是在 rabbit 中使用一些元数据(包括任务名称)排队消息。 两名工作人员中的一位阅读了邮件。 A 和 B 都在监听同一个队列,因此任何一个都可以读取它。
一个。如果 A 读取消息,它将尝试查找名为
tasks.beat
的任务,这会爆炸,因为 A 没有定义该任务。b。如果 B 读取消息,它将成功地尝试找到名为
tasks.beat
的任务(因为它 确实 有该任务)并且将 运行 代码。tasks.beat
将在 rabbit 中加入一条包含tasks.print
. 元数据的新消息
- 同样的问题会再次出现,因为只有 A 和 B 中的一个定义了
tasks.print
,但其中任何一个都可能收到消息。
实际上,celery 可能会做一些检查以更早地抛出错误消息,但我相当确定这是潜在的问题。
简而言之,队列中的所有工作人员(包括节拍)都应该运行使用相同的代码。