Celery SoftTimeLimitExceeded 和 TimeLimitExceeded
Celery SoftTimeLimitExceeded and TimeLimitExceeded
下面是文档中的代码:
from celery.exceptions import SoftTimeLimitExceeded
@celery.task(soft_time_limit=15, time_limit=20)
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()
问题是 celery 如何允许捕获函数内部的异常。
如果它执行 my_task
并引发 SoftTimeLimitExceeded
,这个异常如何在函数内部传播?
此外,为什么无法在函数内部捕获 TimeLimitExceeded
?
谢谢。
SoftTimeLimit 正是出于这个原因而存在 - 因此您可以捕获异常并进行处理。如果达到限制,硬限制实际上会从 运行 停止任务。我认为这是故意(而且我会正确地补充)这样设计的,所以我们开发人员不会把事情搞砸。
这里是一个如何捕获 SoftTimeLimitExceeded 异常的例子 (https://github.com/scoringengine/scoringengine/blob/master/scoring_engine/engine/execute_command.py):
from scoring_engine.celery_app import celery_app
from celery.exceptions import SoftTimeLimitExceeded
import subprocess
from scoring_engine.logger import logger
@celery_app.task(name='execute_command', acks_late=True, reject_on_worker_lost=True, soft_time_limit=30)
def execute_command(job):
output = ""
# Disable duplicate celery log messages
if logger.propagate:
logger.propagate = False
logger.info("Running cmd for " + str(job))
try:
cmd_result = subprocess.run(
job['command'],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
output = cmd_result.stdout.decode("utf-8")
job['errored_out'] = False
except SoftTimeLimitExceeded:
job['errored_out'] = True
job['output'] = output
return job
简而言之,Celery 通过信号中断您的任务并引发 SoftTimeLimitExceeded
。
请注意,可以通过几种不同的方式将 Celery 配置为 运行 任务(例如线程),但我的回答仅限于进程池。在这种情况下,您的任务正在工作人员的子进程之一中执行。创建这些池进程时,Celery 会注册一个信号处理程序来处理 SIGUSR1
信号。超时后,发送 SIGUSR1
。这会中断您的任务:无论它们在 Python 字节码执行中的什么位置,它们都会停止,将 Celery 的信号处理程序添加到它们的堆栈帧中,然后执行 Celery 的处理程序,这会引发 SoftTimeLimitExceeded
。异常向上传播堆栈帧(注意这将是中断发生时您的任务所在的位置)直到它被捕获,大概是被您的任务代码捕获。
下面是文档中的代码:
from celery.exceptions import SoftTimeLimitExceeded
@celery.task(soft_time_limit=15, time_limit=20)
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()
问题是 celery 如何允许捕获函数内部的异常。
如果它执行 my_task
并引发 SoftTimeLimitExceeded
,这个异常如何在函数内部传播?
此外,为什么无法在函数内部捕获 TimeLimitExceeded
?
谢谢。
SoftTimeLimit 正是出于这个原因而存在 - 因此您可以捕获异常并进行处理。如果达到限制,硬限制实际上会从 运行 停止任务。我认为这是故意(而且我会正确地补充)这样设计的,所以我们开发人员不会把事情搞砸。
这里是一个如何捕获 SoftTimeLimitExceeded 异常的例子 (https://github.com/scoringengine/scoringengine/blob/master/scoring_engine/engine/execute_command.py):
from scoring_engine.celery_app import celery_app
from celery.exceptions import SoftTimeLimitExceeded
import subprocess
from scoring_engine.logger import logger
@celery_app.task(name='execute_command', acks_late=True, reject_on_worker_lost=True, soft_time_limit=30)
def execute_command(job):
output = ""
# Disable duplicate celery log messages
if logger.propagate:
logger.propagate = False
logger.info("Running cmd for " + str(job))
try:
cmd_result = subprocess.run(
job['command'],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
output = cmd_result.stdout.decode("utf-8")
job['errored_out'] = False
except SoftTimeLimitExceeded:
job['errored_out'] = True
job['output'] = output
return job
简而言之,Celery 通过信号中断您的任务并引发 SoftTimeLimitExceeded
。
请注意,可以通过几种不同的方式将 Celery 配置为 运行 任务(例如线程),但我的回答仅限于进程池。在这种情况下,您的任务正在工作人员的子进程之一中执行。创建这些池进程时,Celery 会注册一个信号处理程序来处理 SIGUSR1
信号。超时后,发送 SIGUSR1
。这会中断您的任务:无论它们在 Python 字节码执行中的什么位置,它们都会停止,将 Celery 的信号处理程序添加到它们的堆栈帧中,然后执行 Celery 的处理程序,这会引发 SoftTimeLimitExceeded
。异常向上传播堆栈帧(注意这将是中断发生时您的任务所在的位置)直到它被捕获,大概是被您的任务代码捕获。