如何在 Celery 中的队列之间切换任务
How to switch tasks between queues in Celery
我在 Celery 中的 tasks.py
中有几个任务。
# this should go to the 'math' queue
@app.task
def add(x,y):
uuid = uuid.uuid4()
result = x + y
return {'id': uuid, 'result': result}
# this should go to the 'info' queue
@app.task
def notification(calculation):
print repr(calculation)
我想做的是将这些任务中的每一个都放在一个单独的 Celery 队列中,然后在每个队列上分配一些工作人员。
问题是我不知道在我的代码中将任务从一个队列放置到另一个队列的方法。
因此,例如当 add
任务完成执行时,我需要一种方法将生成的 python 字典放入 info
队列以供进一步处理。我该怎么做?
提前致谢。
编辑-澄清-
正如我在评论中所说,问题本质上变成了工作人员如何将从 queue A
检索到的数据放置到 queue B
。
你可以这样试试
无论在哪里调用任务,都可以将任务分配到哪个队列。
add.apply_async(queue="queuename1")
notification.apply_async(queue="queuename2")
通过这种方式,您可以将任务放入单独的队列中。
独立队列的工人
celery -A proj -Q queuename1 -l info
celery -A proj -Q queuename2 -l info
但是你必须知道 default
队列是 celery
。所以如果没有指定 queue name
的任何任务将转到 celery
queue.So [的消费者=16=] 如有需要,请提供。
celery -A proj -Q queuename1,celery -l info
你期待的答案
如果你想将一个任务的结果传递给another.Then
result = add.apply_async(queue="queuename1")
result = result.get() #This contain the return value of task
然后
notification.apply_async(args=[result], queue="queuename2")
我在 Celery 中的 tasks.py
中有几个任务。
# this should go to the 'math' queue
@app.task
def add(x,y):
uuid = uuid.uuid4()
result = x + y
return {'id': uuid, 'result': result}
# this should go to the 'info' queue
@app.task
def notification(calculation):
print repr(calculation)
我想做的是将这些任务中的每一个都放在一个单独的 Celery 队列中,然后在每个队列上分配一些工作人员。
问题是我不知道在我的代码中将任务从一个队列放置到另一个队列的方法。
因此,例如当 add
任务完成执行时,我需要一种方法将生成的 python 字典放入 info
队列以供进一步处理。我该怎么做?
提前致谢。
编辑-澄清-
正如我在评论中所说,问题本质上变成了工作人员如何将从 queue A
检索到的数据放置到 queue B
。
你可以这样试试
无论在哪里调用任务,都可以将任务分配到哪个队列。
add.apply_async(queue="queuename1")
notification.apply_async(queue="queuename2")
通过这种方式,您可以将任务放入单独的队列中。
独立队列的工人
celery -A proj -Q queuename1 -l info
celery -A proj -Q queuename2 -l info
但是你必须知道 default
队列是 celery
。所以如果没有指定 queue name
的任何任务将转到 celery
queue.So [的消费者=16=] 如有需要,请提供。
celery -A proj -Q queuename1,celery -l info
你期待的答案
如果你想将一个任务的结果传递给another.Then
result = add.apply_async(queue="queuename1")
result = result.get() #This contain the return value of task
然后
notification.apply_async(args=[result], queue="queuename2")