@task_postrun.connect 在 Celery 中发出信号并执行另一个任务会导致一些无限循环的执行
@task_postrun.connect signal in Celery and executing another task results in some infite loop of executions
我的 celery 任务需要遵循以下工作流程。
当 taskA 成功完成后,我想执行 taskB。
我知道有信号 @task_success
但这 returns 只是任务的结果,我需要访问先前任务参数的参数。所以我决定使用这样的代码:
@app.task
def taskA(arg):
# not cool, but... https://github.com/celery/celery/issues/3797
from shopify.tasks import taskA
taskA(arg)
@task_postrun.connect
def fetch_taskA_success_handler(sender=None, **kwargs):
from gcp.tasks import taskB
if kwargs.get('state') == 'SUCCESS':
taskB.apply_async((kwargs.get('args')[0], ))
问题是 taskB
似乎在无限循环中执行了很多很多次,而不是只执行了一次。
这样就可以正常工作了:
@app.task
def taskA(arg):
# not cool, but... https://github.com/celery/celery/issues/3797
# otherwise it won't added in periodic tasks
from shopify.tasks import taskA
return taskA(arg)
@task_postrun.connect
def taskA_success_handler(sender=None, state=None, **kwargs):
resource_name = kwargs.get('kwargs', {}).get('resource_name')
if resource_name and state == 'SUCCESS':
if sender.name == 'shopify.tasks.taskA':
from gcp.tasks import taskB
taskB.apply_async(kwargs={
'resource_name': resource_name
})
仅供参考:
celery==4.1.0
Django==2.0
django-celery-beat==1.1.0
django-celery-results==1.0.1
flower==0.9.2
amqp==2.2.2
Python 3.6
我的 celery 任务需要遵循以下工作流程。
当 taskA 成功完成后,我想执行 taskB。
我知道有信号 @task_success
但这 returns 只是任务的结果,我需要访问先前任务参数的参数。所以我决定使用这样的代码:
@app.task
def taskA(arg):
# not cool, but... https://github.com/celery/celery/issues/3797
from shopify.tasks import taskA
taskA(arg)
@task_postrun.connect
def fetch_taskA_success_handler(sender=None, **kwargs):
from gcp.tasks import taskB
if kwargs.get('state') == 'SUCCESS':
taskB.apply_async((kwargs.get('args')[0], ))
问题是 taskB
似乎在无限循环中执行了很多很多次,而不是只执行了一次。
这样就可以正常工作了:
@app.task
def taskA(arg):
# not cool, but... https://github.com/celery/celery/issues/3797
# otherwise it won't added in periodic tasks
from shopify.tasks import taskA
return taskA(arg)
@task_postrun.connect
def taskA_success_handler(sender=None, state=None, **kwargs):
resource_name = kwargs.get('kwargs', {}).get('resource_name')
if resource_name and state == 'SUCCESS':
if sender.name == 'shopify.tasks.taskA':
from gcp.tasks import taskB
taskB.apply_async(kwargs={
'resource_name': resource_name
})
仅供参考:
celery==4.1.0
Django==2.0
django-celery-beat==1.1.0
django-celery-results==1.0.1
flower==0.9.2
amqp==2.2.2
Python 3.6