如何在 Django-Celery 失败时设置重试任务

How to set retry tasks in case of failure in Django-Celery

我正在尝试 运行 使用芹菜的任务。 我需要在用户按下发送按钮时向远程服务器发送 post 请求,所以我尝试在设置文件中使用以下配置将 celery 与 Redis 一起使用:

BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'

根据 apply_async 的文档,我可以像下面的代码一样定义重试选项:

__task_expiration = 60
__interval_start = 1 * 60

api_generator.apply_async(args=(*args),
                                group=user_key,
                                expires=__task_expiration,
                                retry=True,
                                retry_policy={
                                  "max_retries": 3,
                                  "interval_start": __interval_start
                                })

在文档中我找到了 apply_async 的定义:

apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)

按照文档,我可以使用重试和 retry_policy

来设置它

以及有关如何定义重试选项的示例代码

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

我希望我的任务 运行 3 次到 运行 以防万一失败,并且每次重试之间的间隔为 60 秒。 我的任务定义如下所示:

@shared_task
def api_generator(*args):
    import requests
    import json
    url = os.environ.get("API_URL_CALL")
    api_access_key = os.environ.get("API_ACCESS_KEY")

    headers = {
        "Authorization": api_access_key,
        "Content-Type": "application/json"
    }

    json_schema = generate_json(*args)

    response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)

    if response.status_code != 200:
        raise NameError("API Response error")

    return response.status_code

但是当我的代码失败时,我在 celery 日志中没有看到任何重试机制,这是什么问题?使用 apply_async 方法调用我的任务时如何定义重试?我正在筹集 NameError("Exception") 告诉工作人员发生了错误。

[编辑 1:添加了 acks_late]

将任务发送给 Celery worker 时,有两件事可能会出错:

  1. 代理和 Message Queue 的连接问题。
  2. 工人引发异常。

第一个问题可以通过像您一样定义 retryretry_policy 来解决。

第二种(就是你要解决的),可以在任务失败时调用self.retry()来解决。

根据您的问题类型,设置 CELERY_ACKS_LATE = True.

可能会有所帮助

查看这些链接以获取更多信息:

Retry Lost or Failed Tasks (Celery, Django and RabbitMQ)

https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/