芹菜任务永久启动(未重试)

Celery task STARTED permanantly (not retried)

我们在 docker 实例中使用 celery worker。如果 docker 实例被终止(docker 可以更改并恢复),我们需要重试该任务。我的任务目前看起来像这样:

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build(self, config, import_data):
    build_chain = chain(
        build_dataset_docstore.s(config, import_data),
        build_index.s(),
        assemble_bundle.s()
    ).on_error(handle_chain_error.s())

    return build_chain

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_dataset_docstore(self, config, import_data):
    # do lots of stuff

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_index(self, config, import_data):
    # do lots of stuff

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def assemble_bundle(self, config, import_data):
    # do lots of stuff

为了模仿容器被重启(worker 被杀死),我运行 下面的脚本:

SLEEP_FOR=1

echo "-> Killing worker"
docker-compose-f docker/docker-compose-dev.yml kill worker

echo "-> Waiting $SLEEP_FOR seconds"
sleep $SLEEP_FOR

echo "-> Bringing worker back to life"
docker-compose-f docker/docker-compose-dev.yml start worker

看着花,我看到任务已经开始...很酷,但是...

编辑: 来自文档:

If the worker won’t shutdown after considerate time, for being stuck in an infinite-loop or similar, you can use the KILL signal to force terminate the worker: but be aware that currently executing tasks will be lost (i.e., unless the tasks have the acks_late option set).

我正在使用 acks late 选项,为什么不重试?

这里的问题似乎是 task_acks_late (https://docs.celeryproject.org/en/latest/userguide/configuration.html#task-acks-late),我认为这是任务中 celery 应用程序的参数。

我将 task_acks_late 更新为 acks_late 并添加了 reject_on_worker_lost 并且此功能符合预期。

因此:

@app.task(bind=True, max_retries=3, default_retry_delay=5, acks_late=True, reject_on_worker_lost=True)
def assemble_bundle(self, config, import_data):
    # do lots of stuff