RabbitMQ 和 Celery:订阅工作完成事件

RabbitMQ and Celery: subscribe to job done event

我有一个简单的 Celery task.py 运行 RabbitMQ 消息代理和 Redis 数据存储

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//', backend="redis://localhost:6379/0")

@app.task
def add(x, y):
    return x + y

和一个listener.py服务,功能很简单

def on_add(result):
    # Do something with the result.

我想以即发即弃的方式调用 add(),让另一个实现 on_add() 的服务处理结果。

这是工作流图:

如何在 Celery 的后端 Redis 上创建一个订阅任务完成事件的侦听器?

您至少有两个选择:

  1. 使用信号 - task-postrun 例如:
@task_postrun.connect
def task_postrun_handler(task_id, task, args, retval, **kwargs):
    if task.name == "add":
        on_add(retval)

请注意,它将 运行 在同一个 celery worker 中。

  1. 如果需要在单独的进程中,可以采用flower的方式listen to the broker's events(比较复杂)