如何使用 Django/Celery 接收调用多个任务并收集任务状态
How to receive invoke multiple tasks and collect task status with Django/Celery
我正在 tasks.py 中设置多个任务,并从 views.py 调用这些任务。我想在 for 循环中调用所有不同的任务,这样我就可以轻松地收集状态并构建进度条。我目前可以逐行调用所有任务(如下所示)。
这是我的问题:如何在 for 循环中从 views.py 调用不同的任务?它总是给我一个 "unicode does not have attribute delay()" 的错误。或者有没有更好的方法来收集不同任务的状态并从中构建进度条?
我试过像这样调用 views.py 中的函数:
对于范围 (1, 6) 中的 i:
函数名 = "calculation" + str(i)
functionName.delay(账号)
但这会产生如上所述的错误 "unicode does not have attribute delay()"
我猜任务是从 tasks.py 导入到 views.py
我目前的tasks.py:
@shared_task
def calculation1(arg):
some action here
@shared_task
def calculation2(arg):
some action here
@shared_task
def calculation3(arg):
some action here
@shared_task
def calculation4(arg):
some action here
@shared_task
def calculation5(arg):
some action here
我的views.py:
result_calculation1= calculation1.delay(accountNumber)
result_calculation2 = calculation2.delay(accountNumber)
result_calculation3 = calculation3.delay(accountNumber)
result_calculation4= calculation4.delay(accountNumber)
result_calculation5 = calculation5.delay(accountNumber)
我想在一个for循环中收集所有任务状态,这样我就可以构建一个进度条,但是如果有任何其他关于收集任务状态和构建进度条的更好建议,那就太好了。
非常感谢您的提前帮助。
你可以把任务放到group and you can receive a GroupResult. you can refer to celery doc
您需要使用 getattr()
从 tasks.py
模块中检索函数,一旦您构建了名称:
from myapp import tasks # Make sure you import the tasks module
for i in range (1, 6):
functionName = "calculation" + str(i)
task = getattr(tasks, functionName) # Get the task by name from the tasks module
获取任务函数后,您可以建立签名列表:
signatures = []
signatures.append(task.s(accountNumber)) # Add task signature
根据签名,您可以创建一个组并将该组作为一个整体执行:
from celery import group
task_group = group(signatures)
group_result = group() # Execute the group
并且从 group_result
您可以访问每个单独的任务结果并围绕它构建进度条(可能在 group_result
中迭代结果并检查每个结果的 status
):
for result in group_result:
status = result.status
# Your progress bar logic...
综合起来:
from celery import group
from myapp import tasks # Make sure you import the tasks module
signatures = []
for i in range (1, 6):
functionName = "calculation" + str(i)
task = getattr(tasks, functionName) # Get the task from the tasks module
signatures.append(task.s(accountNumber)) # Add each task signature
task_group = group(signatures)
group_result = group() # Execute the group
for result in group_result:
status = result.status
# Your progress bar logic...
我正在 tasks.py 中设置多个任务,并从 views.py 调用这些任务。我想在 for 循环中调用所有不同的任务,这样我就可以轻松地收集状态并构建进度条。我目前可以逐行调用所有任务(如下所示)。
这是我的问题:如何在 for 循环中从 views.py 调用不同的任务?它总是给我一个 "unicode does not have attribute delay()" 的错误。或者有没有更好的方法来收集不同任务的状态并从中构建进度条?
我试过像这样调用 views.py 中的函数: 对于范围 (1, 6) 中的 i: 函数名 = "calculation" + str(i) functionName.delay(账号) 但这会产生如上所述的错误 "unicode does not have attribute delay()"
我猜任务是从 tasks.py 导入到 views.py
我目前的tasks.py:
@shared_task
def calculation1(arg):
some action here
@shared_task
def calculation2(arg):
some action here
@shared_task
def calculation3(arg):
some action here
@shared_task
def calculation4(arg):
some action here
@shared_task
def calculation5(arg):
some action here
我的views.py:
result_calculation1= calculation1.delay(accountNumber)
result_calculation2 = calculation2.delay(accountNumber)
result_calculation3 = calculation3.delay(accountNumber)
result_calculation4= calculation4.delay(accountNumber)
result_calculation5 = calculation5.delay(accountNumber)
我想在一个for循环中收集所有任务状态,这样我就可以构建一个进度条,但是如果有任何其他关于收集任务状态和构建进度条的更好建议,那就太好了。
非常感谢您的提前帮助。
你可以把任务放到group and you can receive a GroupResult. you can refer to celery doc
您需要使用 getattr()
从 tasks.py
模块中检索函数,一旦您构建了名称:
from myapp import tasks # Make sure you import the tasks module
for i in range (1, 6):
functionName = "calculation" + str(i)
task = getattr(tasks, functionName) # Get the task by name from the tasks module
获取任务函数后,您可以建立签名列表:
signatures = []
signatures.append(task.s(accountNumber)) # Add task signature
根据签名,您可以创建一个组并将该组作为一个整体执行:
from celery import group
task_group = group(signatures)
group_result = group() # Execute the group
并且从 group_result
您可以访问每个单独的任务结果并围绕它构建进度条(可能在 group_result
中迭代结果并检查每个结果的 status
):
for result in group_result:
status = result.status
# Your progress bar logic...
综合起来:
from celery import group
from myapp import tasks # Make sure you import the tasks module
signatures = []
for i in range (1, 6):
functionName = "calculation" + str(i)
task = getattr(tasks, functionName) # Get the task from the tasks module
signatures.append(task.s(accountNumber)) # Add each task signature
task_group = group(signatures)
group_result = group() # Execute the group
for result in group_result:
status = result.status
# Your progress bar logic...