Celery SoftTimeLimitExceeded 和 TimeLimitExceeded

Celery SoftTimeLimitExceeded and TimeLimitExceeded

下面是文档中的代码:

from celery.exceptions import SoftTimeLimitExceeded

@celery.task(soft_time_limit=15, time_limit=20)
def mytask():
    try:
        return do_work()
    except SoftTimeLimitExceeded:
        cleanup_in_a_hurry()

问题是 celery 如何允许捕获函数内部的异常。 如果它执行 my_task 并引发 SoftTimeLimitExceeded,这个异常如何在函数内部传播?

此外,为什么无法在函数内部捕获 TimeLimitExceeded

谢谢。

SoftTimeLimit 正是出于这个原因而存在 - 因此您可以捕获异常并进行处理。如果达到限制,硬限制实际上会从 运行 停止任务。我认为这是故意(而且我会正确地补充)这样设计的,所以我们开发人员不会把事情搞砸。

这里是一个如何捕获 SoftTimeLimitExceeded 异常的例子 (https://github.com/scoringengine/scoringengine/blob/master/scoring_engine/engine/execute_command.py):

from scoring_engine.celery_app import celery_app
from celery.exceptions import SoftTimeLimitExceeded
import subprocess

from scoring_engine.logger import logger


@celery_app.task(name='execute_command', acks_late=True, reject_on_worker_lost=True, soft_time_limit=30)
def execute_command(job):
    output = ""
    # Disable duplicate celery log messages
    if logger.propagate:
        logger.propagate = False
    logger.info("Running cmd for " + str(job))
    try:
        cmd_result = subprocess.run(
            job['command'],
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT
        )
        output = cmd_result.stdout.decode("utf-8")
        job['errored_out'] = False
    except SoftTimeLimitExceeded:
        job['errored_out'] = True
    job['output'] = output
    return job

简而言之,Celery 通过信号中断您的任务并引发 SoftTimeLimitExceeded

请注意,可以通过几种不同的方式将 Celery 配置为 运行 任务(例如线程),但我的回答仅限于进程池。在这种情况下,您的任务正在工作人员的子进程之一中执行。创建这些池进程时,Celery 会注册一个信号处理程序来处理 SIGUSR1 信号。超时后,发送 SIGUSR1。这会中断您的任务:无论它们在 Python 字节码执行中的什么位置,它们都会停止,将 Celery 的信号处理程序添加到它们的堆栈帧中,然后执行 Celery 的处理程序,这会引发 SoftTimeLimitExceeded。异常向上传播堆栈帧(注意这将是中断发生时您的任务所在的位置)直到它被捕获,大概是被您的任务代码捕获。