当我不存储他们的任务 ID 时,如何从 redis 后端获取芹菜任务的 results/failures?
How can I get results/failures for celery tasks from a redis backend when I don't store their their task-ID?
在我的网络应用程序中,我使用 celery 启动后台作业而不存储它们的 id。有些任务是周期性的,有些是由用户交互触发的。 celery-tasks 只是做他们的事情,最终用户将在他们的浏览器中看到更新的数据。当任务最近失败时,我想通知所有登录的管理员用户(因为他们通常是触发最近失败的人)。所以他们至少知道出了什么事。
我发现的相关芹菜方法要么需要有效的任务 ID(例如 celery.result.AsyncResult
),要么它们只有关于活动任务的信息,但没有关于 finished/failed 任务的信息(例如 celery.app.control.Inspect
).
我正在使用 flask 前端、用于 celery 的 redis 后端以及用于持久数据的常规数据库。
在这种情况下,我将如何收集有关最近完成或失败的 celery 任务的信息?
我尝试过的:
# I setup celery with
my_celery_project = Celery(__name__,
backend='redis://localhost:1234/0',
broker='redis://localhost:1234/0')
# later in the view I want to collect status information:
i = my_celery_project.control.inspect()
i.active() # this one exists, but I don't care about it
i.failed() # this is what I want, but it doesn't exist
i.results() # this also doesn't exist
# getting the result directly also doesn't work, since they require an id, which i don't have
res = AsyncResult(id_i_don_have,app=app)
应该可以,因为我可以用 redis-cli --scan
从 redis 手动获取结果,然后 my_task.AsyncResult('id_from_redis').status
检查结果。类似于 flower 的东西也可以工作,但我认为这对于网络应用程序的无状态性质来说效果不太好。
这不是这些问题的重复,因为它们不假定 redis 后端。而且它们已经过时 4 年多了:
- Celery: list all tasks, scheduled, active *and* finished
- Python celery: Retrieve tasks arguments if there's an exception
- How can I get a list of succeeded Celery task ids?
这不是这些问题的重复,因为我的 redis-backend 实际上在工作:
- Celery tasks not returning results from redis
这不是这个问题的重复,因为它与我的问题完全相反。他们关心旧的结果,而我明确地只关心最近的结果:
你应该使用信号,像这样:
from celery import signals
@signals.task_failure.connect
def exception_handle(sender, task_id, exception, **kwargs):
if isinstance(exception, redis.exceptions.LockError):
loggert.warning(f"{sender.__qualname__}[{task_id}] can't get lock")
return
loggert.exception(f"{sender.__qualname__}[{task_id}] args={kwargs['args']} kwargs={kwargs['kwargs']} Exception:\n")
@signals.after_setup_logger.connect
def celery_log(logger, **kwargs):
check_console(logger, **kwargs)
@signals.after_setup_task_logger.connect
def task_log(logger, **kwargs):
# todo: add your loggre handle herre...
check_console(logger, **kwargs)
@signals.worker_ready.connect
def clean_lock(**kwargs):
loggert.info('worker_ready')
@signals.worker_init.connect
def hook_prefork(sender, **kwargs):
...
def check_console(logger, format, **kwargs):
if not list(filter(lambda x: type(x) is logging.StreamHandler, logger.handlers)):
console = logging.StreamHandler()
console.setFormatter(logging.Formatter(format))
console.setLevel(logging.INFO)
logger.addHandler(console)
最后我的解决方案是直接从后端获取 ID,然后通过我的 celery-instance:
将它们转换为对象
task_results: List[AsyncResult] = []
for key in my_celery_project.backend.client.scan_iter("celery-task-meta-*"):
task_id = str(key).split("celery-task-meta-", 1)[1].replace("'", "")
task_results.append(self.celery.AsyncResult(task_id))
return task_results
然后我用async_result.ready()
过滤掉我感兴趣的。
附带说明:现在我还调用 async_result.forget()
来清理旧任务,这是我以前没有做过的。
在我的网络应用程序中,我使用 celery 启动后台作业而不存储它们的 id。有些任务是周期性的,有些是由用户交互触发的。 celery-tasks 只是做他们的事情,最终用户将在他们的浏览器中看到更新的数据。当任务最近失败时,我想通知所有登录的管理员用户(因为他们通常是触发最近失败的人)。所以他们至少知道出了什么事。
我发现的相关芹菜方法要么需要有效的任务 ID(例如 celery.result.AsyncResult
),要么它们只有关于活动任务的信息,但没有关于 finished/failed 任务的信息(例如 celery.app.control.Inspect
).
我正在使用 flask 前端、用于 celery 的 redis 后端以及用于持久数据的常规数据库。
在这种情况下,我将如何收集有关最近完成或失败的 celery 任务的信息?
我尝试过的:
# I setup celery with
my_celery_project = Celery(__name__,
backend='redis://localhost:1234/0',
broker='redis://localhost:1234/0')
# later in the view I want to collect status information:
i = my_celery_project.control.inspect()
i.active() # this one exists, but I don't care about it
i.failed() # this is what I want, but it doesn't exist
i.results() # this also doesn't exist
# getting the result directly also doesn't work, since they require an id, which i don't have
res = AsyncResult(id_i_don_have,app=app)
应该可以,因为我可以用 redis-cli --scan
从 redis 手动获取结果,然后 my_task.AsyncResult('id_from_redis').status
检查结果。类似于 flower 的东西也可以工作,但我认为这对于网络应用程序的无状态性质来说效果不太好。
这不是这些问题的重复,因为它们不假定 redis 后端。而且它们已经过时 4 年多了:
- Celery: list all tasks, scheduled, active *and* finished
- Python celery: Retrieve tasks arguments if there's an exception
- How can I get a list of succeeded Celery task ids?
这不是这些问题的重复,因为我的 redis-backend 实际上在工作:
- Celery tasks not returning results from redis
这不是这个问题的重复,因为它与我的问题完全相反。他们关心旧的结果,而我明确地只关心最近的结果:
你应该使用信号,像这样:
from celery import signals
@signals.task_failure.connect
def exception_handle(sender, task_id, exception, **kwargs):
if isinstance(exception, redis.exceptions.LockError):
loggert.warning(f"{sender.__qualname__}[{task_id}] can't get lock")
return
loggert.exception(f"{sender.__qualname__}[{task_id}] args={kwargs['args']} kwargs={kwargs['kwargs']} Exception:\n")
@signals.after_setup_logger.connect
def celery_log(logger, **kwargs):
check_console(logger, **kwargs)
@signals.after_setup_task_logger.connect
def task_log(logger, **kwargs):
# todo: add your loggre handle herre...
check_console(logger, **kwargs)
@signals.worker_ready.connect
def clean_lock(**kwargs):
loggert.info('worker_ready')
@signals.worker_init.connect
def hook_prefork(sender, **kwargs):
...
def check_console(logger, format, **kwargs):
if not list(filter(lambda x: type(x) is logging.StreamHandler, logger.handlers)):
console = logging.StreamHandler()
console.setFormatter(logging.Formatter(format))
console.setLevel(logging.INFO)
logger.addHandler(console)
最后我的解决方案是直接从后端获取 ID,然后通过我的 celery-instance:
将它们转换为对象
task_results: List[AsyncResult] = []
for key in my_celery_project.backend.client.scan_iter("celery-task-meta-*"):
task_id = str(key).split("celery-task-meta-", 1)[1].replace("'", "")
task_results.append(self.celery.AsyncResult(task_id))
return task_results
然后我用async_result.ready()
过滤掉我感兴趣的。
附带说明:现在我还调用 async_result.forget()
来清理旧任务,这是我以前没有做过的。