如何使用 Django/Celery 接收调用多个任务并收集任务状态

How to receive invoke multiple tasks and collect task status with Django/Celery

我正在 tasks.py 中设置多个任务,并从 views.py 调用这些任务。我想在 for 循环中调用所有不同的任务,这样我就可以轻松地收集状态并构建进度条。我目前可以逐行调用所有任务(如下所示)。

这是我的问题:如何在 for 循环中从 views.py 调用不同的任务?它总是给我一个 "unicode does not have attribute delay()" 的错误。或者有没有更好的方法来收集不同任务的状态并从中构建进度条?

我试过像这样调用 views.py 中的函数: 对于范围 (1, 6) 中的 i: 函数名 = "calculation" + str(i) functionName.delay(账号) 但这会产生如上所述的错误 "unicode does not have attribute delay()"

我猜任务是从 tasks.py 导入到 views.py

我目前的tasks.py:

@shared_task
def calculation1(arg):
     some action here

@shared_task
def calculation2(arg):
    some action here

@shared_task
def calculation3(arg):
    some action here

@shared_task
def calculation4(arg):
    some action here

@shared_task
def calculation5(arg):
    some action here

我的views.py:

result_calculation1= calculation1.delay(accountNumber)
result_calculation2 = calculation2.delay(accountNumber)
result_calculation3 = calculation3.delay(accountNumber)
result_calculation4= calculation4.delay(accountNumber)
result_calculation5 = calculation5.delay(accountNumber)

我想在一个for循环中收集所有任务状态,这样我就可以构建一个进度条,但是如果有任何其他关于收集任务状态和构建进度条的更好建议,那就太好了。

非常感谢您的提前帮助。

你可以把任务放到group and you can receive a GroupResult. you can refer to celery doc

您需要使用 getattr()tasks.py 模块中检索函数,一旦您构建了名称:

from myapp import tasks  # Make sure you import the tasks module

for i in range (1, 6): 
    functionName = "calculation" + str(i)  
    task = getattr(tasks, functionName)  # Get the task by name from the tasks module

获取任务函数后,您可以建立签名列表:

signatures = []
signatures.append(task.s(accountNumber))  # Add task signature

根据签名,您可以创建一个组并将该组作为一个整体执行:

from celery import group

task_group = group(signatures)
group_result = group()  # Execute the group

并且从 group_result 您可以访问每个单独的任务结果并围绕它构建进度条(可能在 group_result 中迭代结果并检查每个结果的 status):

for result in group_result:
    status = result.status
    # Your progress bar logic...

综合起来:

from celery import group
from myapp import tasks  # Make sure you import the tasks module

signatures = []

for i in range (1, 6): 
    functionName = "calculation" + str(i) 
    task = getattr(tasks, functionName)  # Get the task from the tasks module
    signatures.append(task.s(accountNumber))  # Add each task signature

task_group = group(signatures)
group_result = group()  # Execute the group

for result in group_result:
    status = result.status
    # Your progress bar logic...