芹菜任务队列不适用于 rabbitmq
celery tasks queue not working with rabbitmq
Celery 任务在没有队列的情况下成功执行
设置。
BROKER_URL = "amqp://user:pass@localhost:5672/test"
# Celery Data Format
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERYD_TASK_SOFT_TIME_LIMIT = 60
CELERY_IGNORE_RESULT = True
@app.task
def test(a,b,c):
print("doing something here...")
命令
celery worker -A proj -E -l INFO
上述设置工作器正在成功执行。
我在 celery 任务中引入了队列。
使用之前的设置添加了配置
from kombu.entity import Exchange, Queue
CELERY_QUEUES = (
Queue('high', Exchange('high'), routing_key='high'),
Queue('normal', Exchange('normal'), routing_key='normal'),
Queue('low', Exchange('low'), routing_key='low'),
)
CELERY_DEFAULT_QUEUE = 'normal'
CELERY_DEFAULT_EXCHANGE = 'normal'
CELERY_DEFAULT_ROUTING_KEY = 'normal'
CELERY_ROUTES = {
'myapp.tasks.test': {'queue': 'high'},
}
命令
celery worker -A proj -E -l INFO -n worker.high -Q high
通话
test.delay(1, 2, 3)
当我用队列工作者执行时不是运行。我是否遗漏了任何配置?
将 CELERY_ROUTES 更改为 CELERY_TASK_ROUTES- 在版本 4 中已更改
首先,确保在 rabbit 和 high worker 日志中都建立了连接。
然后,尝试将您的 CELERY_ROUTES
更改为:
CELERY_ROUTES = {
'myapp.tasks.test': {
'exchange': 'high',
'exchange_type': 'high',
'routing_key': 'high'
}
}
或者用queue
调用任务,例如:
test_task = test.signature(args=(1, 2, 3), queue='high', immutable=True)
test_task.apply_async()
Celery 任务在没有队列的情况下成功执行
设置。
BROKER_URL = "amqp://user:pass@localhost:5672/test"
# Celery Data Format
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERYD_TASK_SOFT_TIME_LIMIT = 60
CELERY_IGNORE_RESULT = True
@app.task
def test(a,b,c):
print("doing something here...")
命令
celery worker -A proj -E -l INFO
上述设置工作器正在成功执行。
我在 celery 任务中引入了队列。
使用之前的设置添加了配置
from kombu.entity import Exchange, Queue
CELERY_QUEUES = (
Queue('high', Exchange('high'), routing_key='high'),
Queue('normal', Exchange('normal'), routing_key='normal'),
Queue('low', Exchange('low'), routing_key='low'),
)
CELERY_DEFAULT_QUEUE = 'normal'
CELERY_DEFAULT_EXCHANGE = 'normal'
CELERY_DEFAULT_ROUTING_KEY = 'normal'
CELERY_ROUTES = {
'myapp.tasks.test': {'queue': 'high'},
}
命令
celery worker -A proj -E -l INFO -n worker.high -Q high
通话
test.delay(1, 2, 3)
当我用队列工作者执行时不是运行。我是否遗漏了任何配置?
将 CELERY_ROUTES 更改为 CELERY_TASK_ROUTES- 在版本 4 中已更改
首先,确保在 rabbit 和 high worker 日志中都建立了连接。
然后,尝试将您的 CELERY_ROUTES
更改为:
CELERY_ROUTES = {
'myapp.tasks.test': {
'exchange': 'high',
'exchange_type': 'high',
'routing_key': 'high'
}
}
或者用queue
调用任务,例如:
test_task = test.signature(args=(1, 2, 3), queue='high', immutable=True)
test_task.apply_async()