芹菜单元测试重试
Celery unit test retrying
我目前正在为我的 celery 任务编写单元测试,并且想测试我的任务是否正在重试。
注意:ALWAYS_EAGER 在测试设置中设置为 True
@app.shared_task(bind=True, soft_time_limit=600, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3},
retry_backoff=3)
def my_task(self, obj_pk):
try:
# ...
function_call_that_can_raise_exception()
except Exception as exc:
last_try = self.request.retries >= self.retry_kwargs["max_retries"]
if last_try:
# ....
else:
# ...
raise_with_context(exc)
我可以通过模拟 celery.app.task.Task.request;
来测试我最后的 运行
@mock.patch("celery.app.task.Task.request")
def test_my_task(self):
mock_task_request.retries = 3
my_task(12)
# some asserts
如何测试我的任务是否真的自动重试?
诀窍是使用应用而不是延迟或apply_async:
def test_my_task_is_retried(self):
my_task.apply(kwargs={"obj_pk": 12})
# assert what should only happen in the last run
我目前正在为我的 celery 任务编写单元测试,并且想测试我的任务是否正在重试。
注意:ALWAYS_EAGER 在测试设置中设置为 True
@app.shared_task(bind=True, soft_time_limit=600, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3},
retry_backoff=3)
def my_task(self, obj_pk):
try:
# ...
function_call_that_can_raise_exception()
except Exception as exc:
last_try = self.request.retries >= self.retry_kwargs["max_retries"]
if last_try:
# ....
else:
# ...
raise_with_context(exc)
我可以通过模拟 celery.app.task.Task.request;
来测试我最后的 运行@mock.patch("celery.app.task.Task.request")
def test_my_task(self):
mock_task_request.retries = 3
my_task(12)
# some asserts
如何测试我的任务是否真的自动重试?
诀窍是使用应用而不是延迟或apply_async:
def test_my_task_is_retried(self):
my_task.apply(kwargs={"obj_pk": 12})
# assert what should only happen in the last run