芹菜如何将参数传递给链接任务(绑定)?

Celery how to pass arguments to linked task (which is bound)?

我有2个任务,其中一个任务成功后应该从第二个开始并通过结果。两个任务都是 bind=True 因为报告重试的逻辑。

我将此任务称为:

async def foobar(x: int):
    task = foo.apply_async(kwargs={'x': x}, link=bar.si())

但不知道如何将结果传递给 bar 任务。 Docs 只显示没有绑定的例子,在我的例子中结果没有传递给第二个任务 bar

回溯显示 y 未通过:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 503, in trace_task
    signature(callbacks[0], app=app).apply_async(
  File "/usr/local/lib/python3.8/site-packages/celery/canvas.py", line 219, in apply_async
    return _apply(args, kwargs, **options)
  File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 537, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: bar() missing 1 required positional argument: 'y'
@celery_app.task(name="foo", bind=True,
                 autoretry_for=(Exception,),
                 default_retry_delay=1,
                 retry_backoff=2,  # <- wait 1s before the 1st retry
                 max_retries=2,
                 retry_jitter=False)
def foo(self, x: int, *args):
    logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
    return {"x": x}


@celery_app.task(name="bar", bind=True,
                 autoretry_for=(Exception,),
                 default_retry_delay=1,
                 retry_backoff=2,  # <- wait 1s before the 1st retry
                 max_retries=2,
                 retry_jitter=False)
def bar(self, y: int):
    logger.info(f'{self.request.id}, countdown: {self.request.retries}, max_retries: {self.max_retries}')
    return {"y_which_should_be_x": y}

我发现我可以使用:

task = (foo.s(x) | bar.s()).apply_async() 并且它按预期工作。