芹菜如何将参数传递给链接任务(绑定)?
Celery how to pass arguments to linked task (which is bound)?
我有2个任务,其中一个任务成功后应该从第二个开始并通过结果。两个任务都是
bind=True
因为报告重试的逻辑。
我将此任务称为:
async def foobar(x: int):
task = foo.apply_async(kwargs={'x': x}, link=bar.si())
但不知道如何将结果传递给 bar 任务。 Docs 只显示没有绑定的例子,在我的例子中结果没有传递给第二个任务 bar
回溯显示 y 未通过:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 503, in trace_task
signature(callbacks[0], app=app).apply_async(
File "/usr/local/lib/python3.8/site-packages/celery/canvas.py", line 219, in apply_async
return _apply(args, kwargs, **options)
File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 537, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: bar() missing 1 required positional argument: 'y'
@celery_app.task(name="foo", bind=True,
autoretry_for=(Exception,),
default_retry_delay=1,
retry_backoff=2, # <- wait 1s before the 1st retry
max_retries=2,
retry_jitter=False)
def foo(self, x: int, *args):
logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
return {"x": x}
@celery_app.task(name="bar", bind=True,
autoretry_for=(Exception,),
default_retry_delay=1,
retry_backoff=2, # <- wait 1s before the 1st retry
max_retries=2,
retry_jitter=False)
def bar(self, y: int):
logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
return {"y_which_should_be_x": y}
我发现我可以使用:
task = (foo.s(x) | bar.s()).apply_async()
并且它按预期工作。
我有2个任务,其中一个任务成功后应该从第二个开始并通过结果。两个任务都是
bind=True
因为报告重试的逻辑。
我将此任务称为:
async def foobar(x: int):
task = foo.apply_async(kwargs={'x': x}, link=bar.si())
但不知道如何将结果传递给 bar 任务。 Docs 只显示没有绑定的例子,在我的例子中结果没有传递给第二个任务 bar
回溯显示 y 未通过:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 503, in trace_task
signature(callbacks[0], app=app).apply_async(
File "/usr/local/lib/python3.8/site-packages/celery/canvas.py", line 219, in apply_async
return _apply(args, kwargs, **options)
File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 537, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: bar() missing 1 required positional argument: 'y'
@celery_app.task(name="foo", bind=True,
autoretry_for=(Exception,),
default_retry_delay=1,
retry_backoff=2, # <- wait 1s before the 1st retry
max_retries=2,
retry_jitter=False)
def foo(self, x: int, *args):
logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
return {"x": x}
@celery_app.task(name="bar", bind=True,
autoretry_for=(Exception,),
default_retry_delay=1,
retry_backoff=2, # <- wait 1s before the 1st retry
max_retries=2,
retry_jitter=False)
def bar(self, y: int):
logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
return {"y_which_should_be_x": y}
我发现我可以使用:
task = (foo.s(x) | bar.s()).apply_async()
并且它按预期工作。