RabbitMQ 和 Celery:订阅工作完成事件
RabbitMQ and Celery: subscribe to job done event
我有一个简单的 Celery task.py
运行 RabbitMQ 消息代理和 Redis 数据存储
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//', backend="redis://localhost:6379/0")
@app.task
def add(x, y):
return x + y
和一个listener.py
服务,功能很简单
def on_add(result):
# Do something with the result.
我想以即发即弃的方式调用 add()
,让另一个实现 on_add()
的服务处理结果。
这是工作流图:
如何在 Celery 的后端 Redis 上创建一个订阅任务完成事件的侦听器?
您至少有两个选择:
- 使用信号 - task-postrun 例如:
@task_postrun.connect
def task_postrun_handler(task_id, task, args, retval, **kwargs):
if task.name == "add":
on_add(retval)
请注意,它将 运行 在同一个 celery worker 中。
- 如果需要在单独的进程中,可以采用flower的方式listen to the broker's events(比较复杂)
我有一个简单的 Celery task.py
运行 RabbitMQ 消息代理和 Redis 数据存储
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//', backend="redis://localhost:6379/0")
@app.task
def add(x, y):
return x + y
和一个listener.py
服务,功能很简单
def on_add(result):
# Do something with the result.
我想以即发即弃的方式调用 add()
,让另一个实现 on_add()
的服务处理结果。
这是工作流图:
如何在 Celery 的后端 Redis 上创建一个订阅任务完成事件的侦听器?
您至少有两个选择:
- 使用信号 - task-postrun 例如:
@task_postrun.connect
def task_postrun_handler(task_id, task, args, retval, **kwargs):
if task.name == "add":
on_add(retval)
请注意,它将 运行 在同一个 celery worker 中。
- 如果需要在单独的进程中,可以采用flower的方式listen to the broker's events(比较复杂)