在单元测试中获取 Celery 重试方法的参数
Get arguments of Celery retry method in unit test
我有一个延迟异步调用的 Celery 任务。它会在发生异常时自行重试,每次尝试都会增加延迟。
@shared_task(bind=True, max_retries=None)
def cancel_issue_request(self):
try:
raise Exception('test')
except Exception:
countdown = 5 * 60 * (2 ** self.request.retries)
raise self.retry(countdown=countdown, max_retries=5)
这在实践中工作正常,但我正在尝试编写一个单元测试来检查倒计时是否确实具有正确的值。
我试图做的是模拟 cancel_issue_request.retry 方法,但是一旦我这样做了,方法本身就会中断并且不会重试任务。
我认为您可以通过 spec=celery.task.Task.retry 以这样的方式模拟一个方法,它仍然执行其所有功能,但我无法让它工作,请赐教?
或者,我想在某个地方阅读从 Celery 完成的重试,但我无法从 Celery 中获取该信息。
设法在不破坏功能的情况下通过以下方式模拟功能:
cancel_issue_request.retry = Mock(side_effect=cancel_issue_request.retry)
我有一个延迟异步调用的 Celery 任务。它会在发生异常时自行重试,每次尝试都会增加延迟。
@shared_task(bind=True, max_retries=None)
def cancel_issue_request(self):
try:
raise Exception('test')
except Exception:
countdown = 5 * 60 * (2 ** self.request.retries)
raise self.retry(countdown=countdown, max_retries=5)
这在实践中工作正常,但我正在尝试编写一个单元测试来检查倒计时是否确实具有正确的值。
我试图做的是模拟 cancel_issue_request.retry 方法,但是一旦我这样做了,方法本身就会中断并且不会重试任务。 我认为您可以通过 spec=celery.task.Task.retry 以这样的方式模拟一个方法,它仍然执行其所有功能,但我无法让它工作,请赐教?
或者,我想在某个地方阅读从 Celery 完成的重试,但我无法从 Celery 中获取该信息。
设法在不破坏功能的情况下通过以下方式模拟功能:
cancel_issue_request.retry = Mock(side_effect=cancel_issue_request.retry)