Celery 任务被 SSE 客户端阻止
Celery tasks get blocked by SSE client
我是 运行 SSE 消费者,为了像这样更新一些表格
@app.task(ignore_result=True)
@worker_ready.connect
def xx_sse_nodes_uptime_info_consumer(sender, **kwargs):
update_tables...
由于某种原因,当我们有一个 .delay
调用其他任务时,其他任务不会被触发。如果我删除上面的任务,其他任务将正常工作。
此外,当 CELERY_TASK_ALWAYS_EAGER=True
和 CELERY_TASK_EAGER_PROPAGATES=True
一切正常时。
由于 SSE 消费者使用 stream=True
,因此 xx_sse_nodes_uptime_info_consumer
永远不会结束。我不确定为什么这会阻止其他任务,因为有足够的线程。
解决了为此创建函数的问题
@worker_ready.connect
def at_worker_ready(sender, **k):
with sender.app.connection() as conn:
sender.app.send_task('nodeinfo.tasks.xx_sse_nodes_uptime_info_consumer', connection=conn)
我是 运行 SSE 消费者,为了像这样更新一些表格
@app.task(ignore_result=True)
@worker_ready.connect
def xx_sse_nodes_uptime_info_consumer(sender, **kwargs):
update_tables...
由于某种原因,当我们有一个 .delay
调用其他任务时,其他任务不会被触发。如果我删除上面的任务,其他任务将正常工作。
此外,当 CELERY_TASK_ALWAYS_EAGER=True
和 CELERY_TASK_EAGER_PROPAGATES=True
一切正常时。
由于 SSE 消费者使用 stream=True
,因此 xx_sse_nodes_uptime_info_consumer
永远不会结束。我不确定为什么这会阻止其他任务,因为有足够的线程。
解决了为此创建函数的问题
@worker_ready.connect
def at_worker_ready(sender, **k):
with sender.app.connection() as conn:
sender.app.send_task('nodeinfo.tasks.xx_sse_nodes_uptime_info_consumer', connection=conn)