带有更新参数的芹菜 "retry"
celery "retry" with updated arguments
考虑一个任务将列表作为参数并处理列表中的每个元素,这可能成功也可能失败。在这种情况下,如何只对失败的元素进行“重试”?
示例:
@app.task(bind=True)
def my_test(self, my_list:list):
new_list = []
for ele in my_list:
try:
do_something_may_fail(ele)
except:
new_list.append(ele)
# how to retry with the new list?
# like
# self.retry(my_list=new_list, countdown=5)
# or
# self.apply_async(new_list, countdown=5)
解决方案 1
使用 Task.retry 及其 args
和 kwargs
输入。
retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)
Retry the task, adding it to the back of the queue.
Parameters
args (Tuple) – Positional arguments to retry with.
kwargs (Dict) – Keyword arguments to retry with.
传递参数时请注意,因为在 args
和 kwargs
中为同一参数设置一个值会导致失败。下面,我选择只用args=(<values here>)
,清空kwargs={}
。您也可以选择使用 kwargs={<values here>}
并清空 args=()
.
的其他方法
tasks.py
from celery import Celery
app = Celery('tasks')
@app.task(
bind=True,
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def my_test(self, some_arg_1: int, my_list: list, some_arg_2: str):
print(f"my_test {some_arg_1} {my_list} {some_arg_2}")
# Filter the failed items. Here, let's say only the last item is successful.
new_list = my_list[:-1]
if new_list:
self.retry(
args=(
some_arg_1 + 1, # some_arg_1 increments per retry
new_list, # Failed items
some_arg_2 * 2, # some_arg_2's length doubles per retry
),
kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
)
日志(制作者)
>>> from tasks import *
>>> my_test.apply_async(args=(0, [1,2,3,4,5], "a"))
<AsyncResult: 121090c6-6b77-4cbd-b1d1-790005e8b18c>
>>>
>>> # The above command is just equivalent to the following (just the same result):
>>> # my_test.apply_async(kwargs={'some_arg_1': 0, 'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
>>> # my_test.apply_async(args=(0,), kwargs={'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
日志(消费者)
[2021-08-25 21:32:06,433: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,434: WARNING/MainProcess] my_test 0 [1, 2, 3, 4, 5] a
[2021-08-25 21:32:06,434: WARNING/MainProcess]
[2021-08-25 21:32:06,438: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,439: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,539: WARNING/MainProcess] my_test 1 [1, 2, 3, 4] aa
[2021-08-25 21:32:06,539: WARNING/MainProcess]
[2021-08-25 21:32:06,541: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,542: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,640: WARNING/MainProcess] my_test 2 [1, 2, 3] aaaa
[2021-08-25 21:32:06,640: WARNING/MainProcess]
[2021-08-25 21:32:06,642: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,643: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,742: WARNING/MainProcess] my_test 3 [1, 2] aaaaaaaa
[2021-08-25 21:32:06,743: WARNING/MainProcess]
[2021-08-25 21:32:06,745: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,747: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,844: WARNING/MainProcess] my_test 4 [1] aaaaaaaaaaaaaaaa
[2021-08-25 21:32:06,844: WARNING/MainProcess]
[2021-08-25 21:32:06,844: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] succeeded in 0.0005442450019472744s: None
- 每次重试都会更新所有任务参数:
some_arg_1
每次重试递增 1,从 0
的起始值到 4
的最后一个值
my_list
每次重试丢失 1 个项目,从 [1, 2, 3, 4, 5]
的起始值到 [1]
的最后一个值
some_arg_2
每次重试都会将其大小加倍,从 "a"
的起始值到 "aaaaaaaaaaaaaaaa"
的最后一个值
解决方案 2
只需从任务本身中调用相同的任务,有点像递归。
tasks.py
from celery import Celery
app = Celery('tasks')
@app.task
def my_test(some_arg_1: int, my_list: list, some_arg_2: str):
print(f"my_test {some_arg_1} {my_list} {some_arg_2}")
# Filter the failed items. Here, let's say only the last item is successful.
new_list = my_list[:-1]
if new_list:
my_test.apply_async(
args=(
some_arg_1 + 1, # some_arg_1 increments per retry
new_list, # Failed items
some_arg_2 * 2, # some_arg_2's length doubles per retry
),
kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
)
日志(生产者和消费者)
- 与解决方案 1 相同
考虑一个任务将列表作为参数并处理列表中的每个元素,这可能成功也可能失败。在这种情况下,如何只对失败的元素进行“重试”?
示例:
@app.task(bind=True)
def my_test(self, my_list:list):
new_list = []
for ele in my_list:
try:
do_something_may_fail(ele)
except:
new_list.append(ele)
# how to retry with the new list?
# like
# self.retry(my_list=new_list, countdown=5)
# or
# self.apply_async(new_list, countdown=5)
解决方案 1
使用 Task.retry 及其 args
和 kwargs
输入。
retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)
Retry the task, adding it to the back of the queue.
Parameters
args (Tuple) – Positional arguments to retry with.
kwargs (Dict) – Keyword arguments to retry with.
传递参数时请注意,因为在 args
和 kwargs
中为同一参数设置一个值会导致失败。下面,我选择只用args=(<values here>)
,清空kwargs={}
。您也可以选择使用 kwargs={<values here>}
并清空 args=()
.
tasks.py
from celery import Celery
app = Celery('tasks')
@app.task(
bind=True,
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def my_test(self, some_arg_1: int, my_list: list, some_arg_2: str):
print(f"my_test {some_arg_1} {my_list} {some_arg_2}")
# Filter the failed items. Here, let's say only the last item is successful.
new_list = my_list[:-1]
if new_list:
self.retry(
args=(
some_arg_1 + 1, # some_arg_1 increments per retry
new_list, # Failed items
some_arg_2 * 2, # some_arg_2's length doubles per retry
),
kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
)
日志(制作者)
>>> from tasks import *
>>> my_test.apply_async(args=(0, [1,2,3,4,5], "a"))
<AsyncResult: 121090c6-6b77-4cbd-b1d1-790005e8b18c>
>>>
>>> # The above command is just equivalent to the following (just the same result):
>>> # my_test.apply_async(kwargs={'some_arg_1': 0, 'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
>>> # my_test.apply_async(args=(0,), kwargs={'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
日志(消费者)
[2021-08-25 21:32:06,433: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,434: WARNING/MainProcess] my_test 0 [1, 2, 3, 4, 5] a
[2021-08-25 21:32:06,434: WARNING/MainProcess]
[2021-08-25 21:32:06,438: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,439: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,539: WARNING/MainProcess] my_test 1 [1, 2, 3, 4] aa
[2021-08-25 21:32:06,539: WARNING/MainProcess]
[2021-08-25 21:32:06,541: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,542: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,640: WARNING/MainProcess] my_test 2 [1, 2, 3] aaaa
[2021-08-25 21:32:06,640: WARNING/MainProcess]
[2021-08-25 21:32:06,642: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,643: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,742: WARNING/MainProcess] my_test 3 [1, 2] aaaaaaaa
[2021-08-25 21:32:06,743: WARNING/MainProcess]
[2021-08-25 21:32:06,745: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,747: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,844: WARNING/MainProcess] my_test 4 [1] aaaaaaaaaaaaaaaa
[2021-08-25 21:32:06,844: WARNING/MainProcess]
[2021-08-25 21:32:06,844: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] succeeded in 0.0005442450019472744s: None
- 每次重试都会更新所有任务参数:
some_arg_1
每次重试递增 1,从0
的起始值到4
的最后一个值
my_list
每次重试丢失 1 个项目,从[1, 2, 3, 4, 5]
的起始值到[1]
的最后一个值
some_arg_2
每次重试都会将其大小加倍,从"a"
的起始值到"aaaaaaaaaaaaaaaa"
的最后一个值
解决方案 2
只需从任务本身中调用相同的任务,有点像递归。
tasks.py
from celery import Celery
app = Celery('tasks')
@app.task
def my_test(some_arg_1: int, my_list: list, some_arg_2: str):
print(f"my_test {some_arg_1} {my_list} {some_arg_2}")
# Filter the failed items. Here, let's say only the last item is successful.
new_list = my_list[:-1]
if new_list:
my_test.apply_async(
args=(
some_arg_1 + 1, # some_arg_1 increments per retry
new_list, # Failed items
some_arg_2 * 2, # some_arg_2's length doubles per retry
),
kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
)
日志(生产者和消费者)
- 与解决方案 1 相同