如何使用 Celery 和 Django 将任务路由到不同的队列
How to route tasks to different queues with Celery and Django
我正在使用以下堆栈:
- Python 3.6
- Celery v4.2.1(经纪人:RabbitMQ v3.6.0)
- Django v2.0.4.
根据Celery's documentation,运行在不同队列上设置定时任务应该和为CELERY_ROUTES[=49=上的任务定义相应的队列一样简单],尽管如此,所有任务似乎都在 Celery 的默认队列中执行。
这是my_app/settings.py上的配置:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = {
'app1.tasks.*': {'queue': 'queue1'},
'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
'app1_test': {
'task': 'app1.tasks.app1_test',
'schedule': 15,
},
'app2_test': {
'task': 'app2.tasks.app2_test',
'schedule': 15,
},
}
这些任务只是用于测试路由的简单脚本:
文件app1/tasks.py:
from my_app.celery import app
import time
@app.task()
def app1_test():
print('I am app1_test task!')
time.sleep(10)
文件app2/tasks.py:
from my_app.celery import app
import time
@app.task()
def app2_test():
print('I am app2_test task!')
time.sleep(10)
当我 运行 带有所有必需队列的 Celery 时:
celery -A my_app worker -B -l info -Q celery,queue1,queue2
RabbitMQ 将显示只有默认队列“celery”运行正在执行任务:
sudo rabbitmqctl list_queues
# Tasks executed by each queue:
# - celery 2
# - queue1 0
# - queue2 0
有人知道如何解决这种意外行为吗?
此致,
向装饰器添加 queue
参数可能会对您有所帮助,
@app.task(<b>queue='queue1'</b>)
def app1_test():
print('I am app1_test task!')
time.sleep(10)
我已经成功了,这里有几点需要注意:
根据Celery's 4.2.0 documentation,CELERY_ROUTES
应该是定义队列路由的变量,但它只对我使用 CELERY_TASK_ROUTES
有效。任务路由似乎独立于 Celery Beat,因此这仅适用于手动安排的任务:
app1_test.delay()
app2_test.delay()
或
app1_test.apply_async()
app2_test.apply_async()
为了让它与 Celery Beat 一起工作,我们只需要在 CELERY_BEAT_SCHEDULE
变量中明确定义队列。文件 my_app/settings.py
的最终设置如下:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
'app1.tasks.*': {'queue': 'queue1'},
'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
'app1_test': {
'task': 'app1.tasks.app1_test',
'schedule': 15,
'options': {'queue': 'queue1'}
},
'app2_test': {
'task': 'app2.tasks.app2_test',
'schedule': 15,
'options': {'queue': 'queue2'}
},
}
然后 运行 芹菜监听这两个队列:
celery -A my_app worker -B -l INFO -Q queue1,queue2
在哪里
-A
: 项目或应用的名称。
-B
:启动任务调度程序Celery beat。
-l
: 定义日志级别。
-Q
: 定义这个worker处理的队列。
我希望这能为其他开发者节省一些时间。
好的,因为我已经尝试了您用于 运行 worker 的相同命令,所以我发现您只需删除 -Q 参数后的“celery”,这样也可以。
所以旧命令是
celery -A my_app worker -B -l info -Q celery,queue1,queue2
新命令是
celery -A my_app worker -B -l info -Q queue1,queue2
我正在使用以下堆栈:
- Python 3.6
- Celery v4.2.1(经纪人:RabbitMQ v3.6.0)
- Django v2.0.4.
根据Celery's documentation,运行在不同队列上设置定时任务应该和为CELERY_ROUTES[=49=上的任务定义相应的队列一样简单],尽管如此,所有任务似乎都在 Celery 的默认队列中执行。
这是my_app/settings.py上的配置:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = {
'app1.tasks.*': {'queue': 'queue1'},
'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
'app1_test': {
'task': 'app1.tasks.app1_test',
'schedule': 15,
},
'app2_test': {
'task': 'app2.tasks.app2_test',
'schedule': 15,
},
}
这些任务只是用于测试路由的简单脚本:
文件app1/tasks.py:
from my_app.celery import app
import time
@app.task()
def app1_test():
print('I am app1_test task!')
time.sleep(10)
文件app2/tasks.py:
from my_app.celery import app
import time
@app.task()
def app2_test():
print('I am app2_test task!')
time.sleep(10)
当我 运行 带有所有必需队列的 Celery 时:
celery -A my_app worker -B -l info -Q celery,queue1,queue2
RabbitMQ 将显示只有默认队列“celery”运行正在执行任务:
sudo rabbitmqctl list_queues
# Tasks executed by each queue:
# - celery 2
# - queue1 0
# - queue2 0
有人知道如何解决这种意外行为吗?
此致,
向装饰器添加 queue
参数可能会对您有所帮助,
@app.task(<b>queue='queue1'</b>)
def app1_test():
print('I am app1_test task!')
time.sleep(10)
我已经成功了,这里有几点需要注意:
根据Celery's 4.2.0 documentation,CELERY_ROUTES
应该是定义队列路由的变量,但它只对我使用 CELERY_TASK_ROUTES
有效。任务路由似乎独立于 Celery Beat,因此这仅适用于手动安排的任务:
app1_test.delay()
app2_test.delay()
或
app1_test.apply_async()
app2_test.apply_async()
为了让它与 Celery Beat 一起工作,我们只需要在 CELERY_BEAT_SCHEDULE
变量中明确定义队列。文件 my_app/settings.py
的最终设置如下:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
'app1.tasks.*': {'queue': 'queue1'},
'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
'app1_test': {
'task': 'app1.tasks.app1_test',
'schedule': 15,
'options': {'queue': 'queue1'}
},
'app2_test': {
'task': 'app2.tasks.app2_test',
'schedule': 15,
'options': {'queue': 'queue2'}
},
}
然后 运行 芹菜监听这两个队列:
celery -A my_app worker -B -l INFO -Q queue1,queue2
在哪里
-A
: 项目或应用的名称。-B
:启动任务调度程序Celery beat。-l
: 定义日志级别。-Q
: 定义这个worker处理的队列。
我希望这能为其他开发者节省一些时间。
好的,因为我已经尝试了您用于 运行 worker 的相同命令,所以我发现您只需删除 -Q 参数后的“celery”,这样也可以。
所以旧命令是
celery -A my_app worker -B -l info -Q celery,queue1,queue2
新命令是
celery -A my_app worker -B -l info -Q queue1,queue2