远程 celery worker 完成后通知或回调 flask
Notify or callback flask after remote celery worker is completed
我是运行 celery client(Flask) 和 worker 在两台不同的机器上,现在一旦 worker 完成任务,我需要在客户端回调一个函数。这可能吗?
芹菜客户端:-
celery_app=Celery('test_multihost', broker='amqp://test:test@<worker_ip>/test_host', backend='rpc')
result= testMethod1.apply_async((param1, param2,param3), link=testMethod2.s())
@celery_app.task
def testMethod2():
#testMethod2 body.
芹菜工人:-
celery_app=Celery('test_multihost', broker='amqp://test:test@<worker_ip>/test_host', backend='rpc')
@celery_app.task
def testMethod1():
#testMethod1 body
但问题是函数 testMethod2 在 celery worker 端执行,而不是在客户端执行。
我是否可以在客户端回调该方法?
一种方法是让 Celery 将其结果写入数据库 table,然后使用 Flask 通过反复查询数据库来轮询任务的结果。类似的构造可能是在 Redis 中保留已完成任务的寄存器,但要点是相同的。
是否要向用户触发完成消息?如果您可以通过 email/text 消息通知,您当然可以让 Celery 处理它。
如果您需要启动一些 Flask 进程——并且出于某种原因它确实需要在 Flask 的生态系统中——使用带有 requests
模块的 worker 来调用 Flask 正在监听的端点。
我使用芹菜信号中的@after_task_publish解决了这个问题。
代码片段如下:-
@after_task_publish.connect(sender=<registered_celery_task>)
def testMethod2(sender=None, headers=None, body=None, **kwargs):
#callback body
在远程机器上完成 celery worker 后,将调用 testMethod2。
在这里,我可以使用 headers 参数访问 celery worker 的结果。
我是运行 celery client(Flask) 和 worker 在两台不同的机器上,现在一旦 worker 完成任务,我需要在客户端回调一个函数。这可能吗?
芹菜客户端:-
celery_app=Celery('test_multihost', broker='amqp://test:test@<worker_ip>/test_host', backend='rpc')
result= testMethod1.apply_async((param1, param2,param3), link=testMethod2.s())
@celery_app.task
def testMethod2():
#testMethod2 body.
芹菜工人:-
celery_app=Celery('test_multihost', broker='amqp://test:test@<worker_ip>/test_host', backend='rpc')
@celery_app.task
def testMethod1():
#testMethod1 body
但问题是函数 testMethod2 在 celery worker 端执行,而不是在客户端执行。
我是否可以在客户端回调该方法?
一种方法是让 Celery 将其结果写入数据库 table,然后使用 Flask 通过反复查询数据库来轮询任务的结果。类似的构造可能是在 Redis 中保留已完成任务的寄存器,但要点是相同的。
是否要向用户触发完成消息?如果您可以通过 email/text 消息通知,您当然可以让 Celery 处理它。
如果您需要启动一些 Flask 进程——并且出于某种原因它确实需要在 Flask 的生态系统中——使用带有 requests
模块的 worker 来调用 Flask 正在监听的端点。
我使用芹菜信号中的@after_task_publish解决了这个问题。 代码片段如下:-
@after_task_publish.connect(sender=<registered_celery_task>)
def testMethod2(sender=None, headers=None, body=None, **kwargs):
#callback body
在远程机器上完成 celery worker 后,将调用 testMethod2。 在这里,我可以使用 headers 参数访问 celery worker 的结果。