芹菜和弦回调未执行
Celery chord callback not executing
几天来我一直在尝试解决这个问题,但仍然没有找到任何解决方案。我有以下代码:
@app.task(name='taggers')
def taggers(text: str) -> dict:
return {'tags': get_tags(text), 'text': text}
@app.task(name='entitied')
def entitied(text: str) -> dict:
return {'entities': get_entities(text)}
@app.task(name='persist_task_es')
def persist_task_es(data: list, document: dict) -> None:
logger.error('persist task')
TaskElastic.create_or_updated_task_document(document, data)
@app.task(name='init_planner', ignore_result=True, rate_limit='1/s')
def init_planner() -> None:
try:
user_tasks = get_user_tasks()
for user_task in user_tasks:
text = f"{user_task['title']} {user_task['plan']['title']}"
chained_tasks = chain(chord([taggers.s(text), entitied.s(text)], persist_task_es.s(user_task)))
chained_tasks.delay()
except Exception as e:
logger.error(e)
logger.info('Indexation error', extra={
"error": e
})
正常执行,没有报错。问题是任务 persist_task_es 永远不会执行。我已经尝试了几种替代方案(比如这个 Celery not running chord callback),但仍然没有执行。你能帮我解决这个问题吗?谢谢。
我终于想出了一个解决方案。该任务实际上正在执行,但我看不到输出,因为它使用了延迟 (chained_tasks.delay()
)。我将其更改为 chained_tasks.apply()
然后我可以看到输出并看到该任务有一些我以前看不到的错误。
几天来我一直在尝试解决这个问题,但仍然没有找到任何解决方案。我有以下代码:
@app.task(name='taggers')
def taggers(text: str) -> dict:
return {'tags': get_tags(text), 'text': text}
@app.task(name='entitied')
def entitied(text: str) -> dict:
return {'entities': get_entities(text)}
@app.task(name='persist_task_es')
def persist_task_es(data: list, document: dict) -> None:
logger.error('persist task')
TaskElastic.create_or_updated_task_document(document, data)
@app.task(name='init_planner', ignore_result=True, rate_limit='1/s')
def init_planner() -> None:
try:
user_tasks = get_user_tasks()
for user_task in user_tasks:
text = f"{user_task['title']} {user_task['plan']['title']}"
chained_tasks = chain(chord([taggers.s(text), entitied.s(text)], persist_task_es.s(user_task)))
chained_tasks.delay()
except Exception as e:
logger.error(e)
logger.info('Indexation error', extra={
"error": e
})
正常执行,没有报错。问题是任务 persist_task_es 永远不会执行。我已经尝试了几种替代方案(比如这个 Celery not running chord callback),但仍然没有执行。你能帮我解决这个问题吗?谢谢。
我终于想出了一个解决方案。该任务实际上正在执行,但我看不到输出,因为它使用了延迟 (chained_tasks.delay()
)。我将其更改为 chained_tasks.apply()
然后我可以看到输出并看到该任务有一些我以前看不到的错误。