如何在 Django-Celery 失败时设置重试任务
How to set retry tasks in case of failure in Django-Celery
我正在尝试 运行 使用芹菜的任务。
我需要在用户按下发送按钮时向远程服务器发送 post 请求,所以我尝试在设置文件中使用以下配置将 celery 与 Redis 一起使用:
BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'
根据 apply_async 的文档,我可以像下面的代码一样定义重试选项:
__task_expiration = 60
__interval_start = 1 * 60
api_generator.apply_async(args=(*args),
group=user_key,
expires=__task_expiration,
retry=True,
retry_policy={
"max_retries": 3,
"interval_start": __interval_start
})
在文档中我找到了 apply_async 的定义:
apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
按照文档,我可以使用重试和 retry_policy
来设置它
以及有关如何定义重试选项的示例代码
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
我希望我的任务 运行 3 次到 运行 以防万一失败,并且每次重试之间的间隔为 60 秒。
我的任务定义如下所示:
@shared_task
def api_generator(*args):
import requests
import json
url = os.environ.get("API_URL_CALL")
api_access_key = os.environ.get("API_ACCESS_KEY")
headers = {
"Authorization": api_access_key,
"Content-Type": "application/json"
}
json_schema = generate_json(*args)
response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)
if response.status_code != 200:
raise NameError("API Response error")
return response.status_code
但是当我的代码失败时,我在 celery 日志中没有看到任何重试机制,这是什么问题?使用 apply_async 方法调用我的任务时如何定义重试?我正在筹集 NameError("Exception")
告诉工作人员发生了错误。
[编辑 1:添加了 acks_late
]
将任务发送给 Celery worker 时,有两件事可能会出错:
- 代理和 Message Queue 的连接问题。
- 工人引发异常。
第一个问题可以通过像您一样定义 retry
和 retry_policy
来解决。
第二种(就是你要解决的),可以在任务失败时调用self.retry()
来解决。
根据您的问题类型,设置 CELERY_ACKS_LATE = True
.
可能会有所帮助
查看这些链接以获取更多信息:
Retry Lost or Failed Tasks (Celery, Django and RabbitMQ)
https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/
我正在尝试 运行 使用芹菜的任务。 我需要在用户按下发送按钮时向远程服务器发送 post 请求,所以我尝试在设置文件中使用以下配置将 celery 与 Redis 一起使用:
BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'
根据 apply_async 的文档,我可以像下面的代码一样定义重试选项:
__task_expiration = 60
__interval_start = 1 * 60
api_generator.apply_async(args=(*args),
group=user_key,
expires=__task_expiration,
retry=True,
retry_policy={
"max_retries": 3,
"interval_start": __interval_start
})
在文档中我找到了 apply_async 的定义:
apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
按照文档,我可以使用重试和 retry_policy
来设置它以及有关如何定义重试选项的示例代码
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
我希望我的任务 运行 3 次到 运行 以防万一失败,并且每次重试之间的间隔为 60 秒。 我的任务定义如下所示:
@shared_task
def api_generator(*args):
import requests
import json
url = os.environ.get("API_URL_CALL")
api_access_key = os.environ.get("API_ACCESS_KEY")
headers = {
"Authorization": api_access_key,
"Content-Type": "application/json"
}
json_schema = generate_json(*args)
response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)
if response.status_code != 200:
raise NameError("API Response error")
return response.status_code
但是当我的代码失败时,我在 celery 日志中没有看到任何重试机制,这是什么问题?使用 apply_async 方法调用我的任务时如何定义重试?我正在筹集 NameError("Exception")
告诉工作人员发生了错误。
[编辑 1:添加了 acks_late
]
将任务发送给 Celery worker 时,有两件事可能会出错:
- 代理和 Message Queue 的连接问题。
- 工人引发异常。
第一个问题可以通过像您一样定义 retry
和 retry_policy
来解决。
第二种(就是你要解决的),可以在任务失败时调用self.retry()
来解决。
根据您的问题类型,设置 CELERY_ACKS_LATE = True
.
查看这些链接以获取更多信息:
Retry Lost or Failed Tasks (Celery, Django and RabbitMQ)
https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/