Celery 任务被 SSE 客户端阻止

Celery tasks get blocked by SSE client

我是 运行 SSE 消费者,为了像这样更新一些表格

@app.task(ignore_result=True)
@worker_ready.connect
def xx_sse_nodes_uptime_info_consumer(sender, **kwargs):
   update_tables...

由于某种原因,当我们有一个 .delay 调用其他任务时,其他任务不会被触发。如果我删除上面的任务,其他任务将正常工作。

此外,当 CELERY_TASK_ALWAYS_EAGER=TrueCELERY_TASK_EAGER_PROPAGATES=True 一切正常时。

由于 SSE 消费者使用 stream=True,因此 xx_sse_nodes_uptime_info_consumer 永远不会结束。我不确定为什么这会阻止其他任务,因为有足够的线程。

解决了为此创建函数的问题

@worker_ready.connect
def at_worker_ready(sender, **k):
    with sender.app.connection() as conn:
        sender.app.send_task('nodeinfo.tasks.xx_sse_nodes_uptime_info_consumer', connection=conn)