当我不存储他们的任务 ID 时,如何从 redis 后端获取芹菜任务的 results/failures?

How can I get results/failures for celery tasks from a redis backend when I don't store their their task-ID?

在我的网络应用程序中,我使用 celery 启动后台作业而不存储它们的 id。有些任务是周期性的,有些是由用户交互触发的。 celery-tasks 只是做他们的事情,最终用户将在他们的浏览器中看到更新的数据。当任务最近失败时,我想通知所有登录的管理员用户(因为他们通常是触发最近失败的人)。所以他们至少知道出了什么事。

我发现的相关芹菜方法要么需要有效的任务 ID(例如 celery.result.AsyncResult),要么它们只有关于活动任务的信息,但没有关于 finished/failed 任务的信息(例如 celery.app.control.Inspect).

我正在使用 flask 前端、用于 celery 的 redis 后端以及用于持久数据的常规数据库。

在这种情况下,我将如何收集有关最近完成或失败的 celery 任务的信息?

我尝试过的:

# I setup celery with 
my_celery_project = Celery(__name__,
                backend='redis://localhost:1234/0',
                broker='redis://localhost:1234/0')

# later in the view I want to collect status information:

i = my_celery_project.control.inspect()

i.active() # this one exists, but I don't care about it
i.failed() # this is what I want, but it doesn't exist
i.results() # this also doesn't exist


# getting the result directly also doesn't work, since they require an id, which i don't have
res = AsyncResult(id_i_don_have,app=app)

应该可以,因为我可以用 redis-cli --scan 从 redis 手动获取结果,然后 my_task.AsyncResult('id_from_redis').status 检查结果。类似于 flower 的东西也可以工作,但我认为这对于网络应用程序的无状态性质来说效果不太好。


这不是这些问题的重复,因为它们不假定 redis 后端。而且它们已经过时 4 年多了:

这不是这些问题的重复,因为我的 redis-backend 实际上在工作:

这不是这个问题的重复,因为它与我的问题完全相反。他们关心旧的结果,而我明确地只关心最近的结果:

你应该使用信号,像这样:

from celery import signals

@signals.task_failure.connect
def exception_handle(sender, task_id, exception, **kwargs):
    if isinstance(exception, redis.exceptions.LockError):
        loggert.warning(f"{sender.__qualname__}[{task_id}] can't get lock")
        return
    loggert.exception(f"{sender.__qualname__}[{task_id}] args={kwargs['args']} kwargs={kwargs['kwargs']} Exception:\n")

@signals.after_setup_logger.connect
def celery_log(logger, **kwargs):
    check_console(logger, **kwargs)


@signals.after_setup_task_logger.connect
def task_log(logger, **kwargs):
    # todo: add your loggre handle herre...
    check_console(logger, **kwargs)


@signals.worker_ready.connect
def clean_lock(**kwargs):
    loggert.info('worker_ready')


@signals.worker_init.connect
def hook_prefork(sender, **kwargs):
    ...

def check_console(logger, format, **kwargs):
    if not list(filter(lambda x: type(x) is logging.StreamHandler, logger.handlers)):
        console = logging.StreamHandler()
        console.setFormatter(logging.Formatter(format))
        console.setLevel(logging.INFO)
        logger.addHandler(console)

最后我的解决方案是直接从后端获取 ID,然后通过我的 celery-instance:

将它们转换为对象

  task_results: List[AsyncResult] = []
  for key in my_celery_project.backend.client.scan_iter("celery-task-meta-*"):
    task_id = str(key).split("celery-task-meta-", 1)[1].replace("'", "")
    task_results.append(self.celery.AsyncResult(task_id))
  return task_results

然后我用async_result.ready()过滤掉我感兴趣的。

附带说明:现在我还调用 async_result.forget() 来清理旧任务,这是我以前没有做过的。