带有更新参数的芹菜 "retry"

celery "retry" with updated arguments

考虑一个任务将列表作为参数并处理列表中的每个元素,这可能成功也可能失败。在这种情况下,如何只对失败的元素进行“重试”?

示例:

@app.task(bind=True)
def my_test(self, my_list:list):
    new_list = []
    for ele in my_list:
        try:
            do_something_may_fail(ele)
        except:
            new_list.append(ele)
    # how to retry with the new list?
    # like 
    # self.retry(my_list=new_list, countdown=5)
    # or
    # self.apply_async(new_list, countdown=5)

解决方案 1

使用 Task.retry 及其 argskwargs 输入。

retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)

Retry the task, adding it to the back of the queue.

Parameters

args (Tuple) – Positional arguments to retry with.

kwargs (Dict) – Keyword arguments to retry with.

传递参数时请注意,因为在 argskwargs 中为同一参数设置一个值会导致失败。下面,我选择只用args=(<values here>),清空kwargs={}。您也可以选择使用 kwargs={<values here>} 并清空 args=().

的其他方法

tasks.py

from celery import Celery

app = Celery('tasks')


@app.task(
    bind=True,
    default_retry_delay=0.1,
    retry_backoff=False,
    max_retries=None,
)
def my_test(self, some_arg_1: int, my_list: list, some_arg_2: str):
    print(f"my_test {some_arg_1} {my_list} {some_arg_2}")

    # Filter the failed items. Here, let's say only the last item is successful.
    new_list = my_list[:-1]

    if new_list:
        self.retry(
            args=(
                some_arg_1 + 1,  # some_arg_1 increments per retry
                new_list,  # Failed items
                some_arg_2 * 2,  # some_arg_2's length doubles per retry
            ),
            kwargs={},  # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
        )

日志(制作者)

>>> from tasks import *
>>> my_test.apply_async(args=(0, [1,2,3,4,5], "a"))
<AsyncResult: 121090c6-6b77-4cbd-b1d1-790005e8b18c>
>>>
>>> # The above command is just equivalent to the following (just the same result):
>>> # my_test.apply_async(kwargs={'some_arg_1': 0, 'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
>>> # my_test.apply_async(args=(0,), kwargs={'my_list': [1,2,3,4,5], 'some_arg_2': "a"})

日志(消费者)

[2021-08-25 21:32:06,433: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,434: WARNING/MainProcess] my_test 0 [1, 2, 3, 4, 5] a
[2021-08-25 21:32:06,434: WARNING/MainProcess] 

[2021-08-25 21:32:06,438: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,439: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,539: WARNING/MainProcess] my_test 1 [1, 2, 3, 4] aa
[2021-08-25 21:32:06,539: WARNING/MainProcess] 

[2021-08-25 21:32:06,541: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,542: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,640: WARNING/MainProcess] my_test 2 [1, 2, 3] aaaa
[2021-08-25 21:32:06,640: WARNING/MainProcess] 

[2021-08-25 21:32:06,642: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,643: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,742: WARNING/MainProcess] my_test 3 [1, 2] aaaaaaaa
[2021-08-25 21:32:06,743: WARNING/MainProcess] 

[2021-08-25 21:32:06,745: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,747: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,844: WARNING/MainProcess] my_test 4 [1] aaaaaaaaaaaaaaaa
[2021-08-25 21:32:06,844: WARNING/MainProcess] 

[2021-08-25 21:32:06,844: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] succeeded in 0.0005442450019472744s: None
  • 每次重试都会更新所有任务参数:
    • some_arg_1 每次重试递增 1,从 0 的起始值到 4
    • 的最后一个值
    • my_list 每次重试丢失 1 个项目,从 [1, 2, 3, 4, 5] 的起始值到 [1]
    • 的最后一个值
    • some_arg_2 每次重试都会将其大小加倍,从 "a" 的起始值到 "aaaaaaaaaaaaaaaa"
    • 的最后一个值

解决方案 2

只需从任务本身中调用相同的任务,有点像递归。

tasks.py

from celery import Celery

app = Celery('tasks')


@app.task
def my_test(some_arg_1: int, my_list: list, some_arg_2: str):
    print(f"my_test {some_arg_1} {my_list} {some_arg_2}")

    # Filter the failed items. Here, let's say only the last item is successful.
    new_list = my_list[:-1]

    if new_list:
        my_test.apply_async(
            args=(
                some_arg_1 + 1,  # some_arg_1 increments per retry
                new_list,  # Failed items
                some_arg_2 * 2,  # some_arg_2's length doubles per retry
            ),
            kwargs={},  # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
        )

日志(生产者和消费者)

  • 与解决方案 1 相同