python所有celery组任务完成后如何调用方法
How to call the method after all the celery group tasks are completed in python
目前我正在做芹菜组任务,我想在所有任务完成后调用upload_local_directory(output_file)方法。我尝试了以下方法,但它没有等待工作完成。
tasks = [make_tmp_files.s(page.object_list, path + str(uuid.uuid4() + '.csv')) for page in paginator]
job = group(tasks)
job.apply_async()
job.get()
output_file = 'final.zip'
upload_local_directory_into_S3(output_file)
make_tmp_files
方法是芹菜作业方法。
“后端”也在芹菜对象中定义。
如果需要更多信息,请发表评论。
您可以 chain your group and the final, make_tmp_files
task, or you use Chord 完成相同的任务。如果您看到 Celery 自动将 group+task chain 转换为 Chord,请不要惊慌。
在 celery 中,一组任务和一组任务结果之间存在差异。如果您将代码更改为以下代码,它应该可以工作:
tasks = [make_tmp_files.s(page.object_list, path + str(uuid.uuid4() + '.csv')) for page in paginator]
job = group(tasks)
job_results = job.apply_async()
job_results.get()
output_file = 'final.zip'
upload_local_directory_into_S3(output_file)
job.apply_async()
returns 一组AsyncResults。作为用户,您需要检查 AsyncResult 的结果,而不是任务本身。
参考:https://docs.celeryproject.org/en/stable/userguide/canvas.html#groups
希望对您有所帮助!
目前我正在做芹菜组任务,我想在所有任务完成后调用upload_local_directory(output_file)方法。我尝试了以下方法,但它没有等待工作完成。
tasks = [make_tmp_files.s(page.object_list, path + str(uuid.uuid4() + '.csv')) for page in paginator]
job = group(tasks)
job.apply_async()
job.get()
output_file = 'final.zip'
upload_local_directory_into_S3(output_file)
make_tmp_files
方法是芹菜作业方法。
“后端”也在芹菜对象中定义。
如果需要更多信息,请发表评论。
您可以 chain your group and the final, make_tmp_files
task, or you use Chord 完成相同的任务。如果您看到 Celery 自动将 group+task chain 转换为 Chord,请不要惊慌。
在 celery 中,一组任务和一组任务结果之间存在差异。如果您将代码更改为以下代码,它应该可以工作:
tasks = [make_tmp_files.s(page.object_list, path + str(uuid.uuid4() + '.csv')) for page in paginator]
job = group(tasks)
job_results = job.apply_async()
job_results.get()
output_file = 'final.zip'
upload_local_directory_into_S3(output_file)
job.apply_async()
returns 一组AsyncResults。作为用户,您需要检查 AsyncResult 的结果,而不是任务本身。
参考:https://docs.celeryproject.org/en/stable/userguide/canvas.html#groups
希望对您有所帮助!