链中的 Celery 任务无序启动
Celery Tasks in Chain Starting Out Of Order
我正在尝试使用 django 3.0、celery 4.3、redis 和 python 3.6 实现一些 celery chains/groups/chords。从文档中,我认为一组 运行 中的任务是并行的,而链中的任务 运行 是顺序的,但我没有观察到这种行为。
我有这个任务签名链:
transaction.on_commit(lambda: chain(clean, group(chain(faces_1, faces_2), folder, hashes), change_state_task.si(document_id, 'ready')).delay())
我希望 change_state_task
在开始之前等待所有其他任务完成。这没有用,因为 change_state_task
在 hashes
完成之前开始。所有任务 运行 并成功完成。
然后我尝试了这条链:
transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.si(document_id, 'ready')).delay())
所有签名都在一个长链中。但是,change_state_task
仍在 hashes
任务完成之前开始。
我什至尝试使用change_state_task.s(document_id, 'ready')
(将si替换为s),认为change_state_task
没有hashes
任务的输出就无法启动。但它仍然在哈希结束之前开始。
然后我尝试对所有任务签名使用 task.s
与 task.si
,并且 change_state_task
仍然在 hashes
任务结束之前开始。
我错过了什么?
谢谢!
马克
PS 很抱歉我的任务签名不清楚。我有一个很长的 Python 方法,可以确定哪些任务必须是 运行。它看起来像这样:
@app.task(bind=True)
def noop(self, message):
# Task accepts a string and does nothing
logger.debug(message)
return True
def figure_out_which_tasks_to_fire(document_id):
clean = noop.si("replaces clean_document_image task")
faces_1 = noop.si("replaces find_faces_task task")
faces_2 = noop.si("replaces recognize_face_task task")
folder = noop.si("replaces update_source_folder task")
hashes = noop.si("replaces compute_image_descriptor_task task")
if clean_needed:
clean = clean_document_image.s(document_id, key, value)
if faces_needed:
faces_1 = find_faces_task.s(document_id)
faces_2 = recognize_face_task.s(document_id)
if folder_needed:
folder = update_source_folder.s(document_id, file_name, source_folder)
if hashes_needed:
hashes = compute_image_descriptor_task.s(settings.DEFAULT_SIMILAR_IMAGE, document_id, hash_name)
# Finished figuring out what needs to be done, so do the tasks
# and then update the state of the document.
transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.s(document_id, 'ready')).delay())
我需要 transaction.on_commit
因为所有任务都读取和写入 Django 应用程序的后端 mysql 数据库。
我遇到了芹菜自动将链式组转换为 chord
的问题。尝试专门使用 chord()
函数。
我正在尝试使用 django 3.0、celery 4.3、redis 和 python 3.6 实现一些 celery chains/groups/chords。从文档中,我认为一组 运行 中的任务是并行的,而链中的任务 运行 是顺序的,但我没有观察到这种行为。
我有这个任务签名链:
transaction.on_commit(lambda: chain(clean, group(chain(faces_1, faces_2), folder, hashes), change_state_task.si(document_id, 'ready')).delay())
我希望 change_state_task
在开始之前等待所有其他任务完成。这没有用,因为 change_state_task
在 hashes
完成之前开始。所有任务 运行 并成功完成。
然后我尝试了这条链:
transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.si(document_id, 'ready')).delay())
所有签名都在一个长链中。但是,change_state_task
仍在 hashes
任务完成之前开始。
我什至尝试使用change_state_task.s(document_id, 'ready')
(将si替换为s),认为change_state_task
没有hashes
任务的输出就无法启动。但它仍然在哈希结束之前开始。
然后我尝试对所有任务签名使用 task.s
与 task.si
,并且 change_state_task
仍然在 hashes
任务结束之前开始。
我错过了什么?
谢谢!
马克
PS 很抱歉我的任务签名不清楚。我有一个很长的 Python 方法,可以确定哪些任务必须是 运行。它看起来像这样:
@app.task(bind=True)
def noop(self, message):
# Task accepts a string and does nothing
logger.debug(message)
return True
def figure_out_which_tasks_to_fire(document_id):
clean = noop.si("replaces clean_document_image task")
faces_1 = noop.si("replaces find_faces_task task")
faces_2 = noop.si("replaces recognize_face_task task")
folder = noop.si("replaces update_source_folder task")
hashes = noop.si("replaces compute_image_descriptor_task task")
if clean_needed:
clean = clean_document_image.s(document_id, key, value)
if faces_needed:
faces_1 = find_faces_task.s(document_id)
faces_2 = recognize_face_task.s(document_id)
if folder_needed:
folder = update_source_folder.s(document_id, file_name, source_folder)
if hashes_needed:
hashes = compute_image_descriptor_task.s(settings.DEFAULT_SIMILAR_IMAGE, document_id, hash_name)
# Finished figuring out what needs to be done, so do the tasks
# and then update the state of the document.
transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.s(document_id, 'ready')).delay())
我需要 transaction.on_commit
因为所有任务都读取和写入 Django 应用程序的后端 mysql 数据库。
我遇到了芹菜自动将链式组转换为 chord
的问题。尝试专门使用 chord()
函数。