Celery - 乱序执行的链式任务
Celery - chained tasks performed out of order
我正在向我的 celery worker 发送一连串的三个任务。第一个和第三个被添加到队列 "filestore",由工作人员 A 提供服务。第二个被添加到队列 "cloud",由工作人员 B 提供服务。
我想要的行为是三个任务依次执行,一个接一个。
我看到的行为是工人 A 执行任务 1,然后是任务 3,然后工人 B 执行任务 2。
result = app.send_task(
"workerTasks_filestore.task_upload_scan_to_s3", args=[scan.scan_name], queue='filestore',
chain=[
Signature('workerTasks.do_processing_task', args=[scan.scan_name, spd_name], queue=queue, immutable=True),
Signature('workerTasks_filestore.task_download_scan_from_s3', args=[scan.scan_name], queue='filestore', immutable=True),
]
)
我做错了什么?
您是否尝试使用 celery 中的链 class?
from celery import chain, Signature
chained_tasks = chain([
Signature('workerTasks_filestore.task_upload_scan_to_s3', args=(scan.scan_name,), queue='filestore'),
Signature('workerTasks.do_processing_task', args=(scan.scan_name, spd_name,), queue=queue, immutable=True),
Signature('workerTasks_filestore.task_download_scan_from_s3', args=(scan.scan_name,), queue='filestore', immutable=True)
])
result = chained_tasks.apply_async()
我正在向我的 celery worker 发送一连串的三个任务。第一个和第三个被添加到队列 "filestore",由工作人员 A 提供服务。第二个被添加到队列 "cloud",由工作人员 B 提供服务。
我想要的行为是三个任务依次执行,一个接一个。
我看到的行为是工人 A 执行任务 1,然后是任务 3,然后工人 B 执行任务 2。
result = app.send_task(
"workerTasks_filestore.task_upload_scan_to_s3", args=[scan.scan_name], queue='filestore',
chain=[
Signature('workerTasks.do_processing_task', args=[scan.scan_name, spd_name], queue=queue, immutable=True),
Signature('workerTasks_filestore.task_download_scan_from_s3', args=[scan.scan_name], queue='filestore', immutable=True),
]
)
我做错了什么?
您是否尝试使用 celery 中的链 class?
from celery import chain, Signature
chained_tasks = chain([
Signature('workerTasks_filestore.task_upload_scan_to_s3', args=(scan.scan_name,), queue='filestore'),
Signature('workerTasks.do_processing_task', args=(scan.scan_name, spd_name,), queue=queue, immutable=True),
Signature('workerTasks_filestore.task_download_scan_from_s3', args=(scan.scan_name,), queue='filestore', immutable=True)
])
result = chained_tasks.apply_async()