远程 celery worker 完成后通知或回调 flask

Notify or callback flask after remote celery worker is completed

我是运行 celery client(Flask) 和 worker 在两台不同的机器上,现在一旦 worker 完成任务,我需要在客户端回调一个函数。这可能吗?

芹菜客户端:-

celery_app=Celery('test_multihost', broker='amqp://test:test@<worker_ip>/test_host', backend='rpc')
result= testMethod1.apply_async((param1, param2,param3), link=testMethod2.s())

@celery_app.task
def testMethod2():
    #testMethod2 body.

芹菜工人:-

celery_app=Celery('test_multihost', broker='amqp://test:test@<worker_ip>/test_host', backend='rpc')
@celery_app.task
def testMethod1():
   #testMethod1 body

但问题是函数 testMethod2 在 celery worker 端执行,而不是在客户端执行。

我是否可以在客户端回调该方法?

一种方法是让 Celery 将其结果写入数据库 table,然后使用 Flask 通过反复查询数据库来轮询任务的结果。类似的构造可能是在 Redis 中保留已完成任务的寄存器,但要点是相同的。

是否要向用户触发完成消息?如果您可以通过 email/text 消息通知,您当然可以让 Celery 处理它。

如果您需要启动一些 Flask 进程——并且出于某种原因它确实需要在 Flask 的生态系统中——使用带有 requests 模块的 worker 来调用 Flask 正在监听的端点。

我使用芹菜信号中的@after_task_publish解决了这个问题。 代码片段如下:-

@after_task_publish.connect(sender=<registered_celery_task>)
def testMethod2(sender=None, headers=None, body=None, **kwargs):
    #callback body

在远程机器上完成 celery worker 后,将调用 testMethod2。 在这里,我可以使用 headers 参数访问 celery worker 的结果。