芹菜和弦回调未执行

Celery chord callback not executing

几天来我一直在尝试解决这个问题,但仍然没有找到任何解决方案。我有以下代码:

@app.task(name='taggers')
def taggers(text: str) -> dict:
   return {'tags': get_tags(text), 'text': text}


@app.task(name='entitied')
def entitied(text: str) -> dict:
   return {'entities': get_entities(text)}


@app.task(name='persist_task_es')
def persist_task_es(data: list, document: dict) -> None:
   logger.error('persist task')
   TaskElastic.create_or_updated_task_document(document, data)


@app.task(name='init_planner', ignore_result=True, rate_limit='1/s')
def init_planner() -> None:
   try:
       user_tasks = get_user_tasks()
       for user_task in user_tasks:
           text = f"{user_task['title']} {user_task['plan']['title']}"
           chained_tasks = chain(chord([taggers.s(text), entitied.s(text)], persist_task_es.s(user_task)))
           chained_tasks.delay()

   except Exception as e:
       logger.error(e)
       logger.info('Indexation error', extra={
           "error": e
       })

正常执行,没有报错。问题是任务 persist_task_es 永远不会执行。我已经尝试了几种替代方案(比如这个 Celery not running chord callback),但仍然没有执行。你能帮我解决这个问题吗?谢谢。

我终于想出了一个解决方案。该任务实际上正在执行,但我看不到输出,因为它使用了延迟 (chained_tasks.delay())。我将其更改为 chained_tasks.apply() 然后我可以看到输出并看到该任务有一些我以前看不到的错误。