芹菜任务永久启动(未重试)
Celery task STARTED permanantly (not retried)
我们在 docker 实例中使用 celery worker。如果 docker 实例被终止(docker 可以更改并恢复),我们需要重试该任务。我的任务目前看起来像这样:
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build(self, config, import_data):
build_chain = chain(
build_dataset_docstore.s(config, import_data),
build_index.s(),
assemble_bundle.s()
).on_error(handle_chain_error.s())
return build_chain
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_dataset_docstore(self, config, import_data):
# do lots of stuff
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_index(self, config, import_data):
# do lots of stuff
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def assemble_bundle(self, config, import_data):
# do lots of stuff
为了模仿容器被重启(worker 被杀死),我运行 下面的脚本:
SLEEP_FOR=1
echo "-> Killing worker"
docker-compose-f docker/docker-compose-dev.yml kill worker
echo "-> Waiting $SLEEP_FOR seconds"
sleep $SLEEP_FOR
echo "-> Bringing worker back to life"
docker-compose-f docker/docker-compose-dev.yml start worker
看着花,我看到任务已经开始...很酷,但是...
- 为什么不重试?
- 我需要手动处理这种情况吗?
- 如果是这样,正确的方法是什么?
编辑:
来自文档:
If the worker won’t shutdown after considerate time, for being stuck in an infinite-loop or similar, you can use the KILL signal to force terminate the worker: but be aware that currently executing tasks will be lost (i.e., unless the tasks have the acks_late option set).
我正在使用 acks late 选项,为什么不重试?
这里的问题似乎是 task_acks_late
(https://docs.celeryproject.org/en/latest/userguide/configuration.html#task-acks-late),我认为这是任务中 celery 应用程序的参数。
我将 task_acks_late
更新为 acks_late
并添加了 reject_on_worker_lost
并且此功能符合预期。
因此:
@app.task(bind=True, max_retries=3, default_retry_delay=5, acks_late=True, reject_on_worker_lost=True)
def assemble_bundle(self, config, import_data):
# do lots of stuff
我们在 docker 实例中使用 celery worker。如果 docker 实例被终止(docker 可以更改并恢复),我们需要重试该任务。我的任务目前看起来像这样:
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build(self, config, import_data):
build_chain = chain(
build_dataset_docstore.s(config, import_data),
build_index.s(),
assemble_bundle.s()
).on_error(handle_chain_error.s())
return build_chain
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_dataset_docstore(self, config, import_data):
# do lots of stuff
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_index(self, config, import_data):
# do lots of stuff
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def assemble_bundle(self, config, import_data):
# do lots of stuff
为了模仿容器被重启(worker 被杀死),我运行 下面的脚本:
SLEEP_FOR=1
echo "-> Killing worker"
docker-compose-f docker/docker-compose-dev.yml kill worker
echo "-> Waiting $SLEEP_FOR seconds"
sleep $SLEEP_FOR
echo "-> Bringing worker back to life"
docker-compose-f docker/docker-compose-dev.yml start worker
看着花,我看到任务已经开始...很酷,但是...
- 为什么不重试?
- 我需要手动处理这种情况吗?
- 如果是这样,正确的方法是什么?
编辑: 来自文档:
If the worker won’t shutdown after considerate time, for being stuck in an infinite-loop or similar, you can use the KILL signal to force terminate the worker: but be aware that currently executing tasks will be lost (i.e., unless the tasks have the acks_late option set).
我正在使用 acks late 选项,为什么不重试?
这里的问题似乎是 task_acks_late
(https://docs.celeryproject.org/en/latest/userguide/configuration.html#task-acks-late),我认为这是任务中 celery 应用程序的参数。
我将 task_acks_late
更新为 acks_late
并添加了 reject_on_worker_lost
并且此功能符合预期。
因此:
@app.task(bind=True, max_retries=3, default_retry_delay=5, acks_late=True, reject_on_worker_lost=True)
def assemble_bundle(self, config, import_data):
# do lots of stuff