DisabledBackend:Celery、Redis 和 Flask 的不稳定行为

DisabledBackend: Erratic Behavior with Celery, Redis & Flask

我已经使用 Celery 一段时间了,在生产中,我使用 RabbitMQ 作为代理,使用 Redis 作为 K8s 集群中的后端,到目前为止没有任何问题。在本地,我 运行 docker 与一些服务(Flask API、2 个不同的 Worker、Beat、Redis、Flower、Hasura)组合,同时使用 Redis 作为代理和后端。

过去几个月我没有遇到此设置的问题,但昨天我在访问任务结果时开始出现不稳定的行为。

任务被发送到队列,工作人员识别它并执行任务,但在查询任务状态时我有时会得到 DisabledBackend。通常在第一次请求时,然后它就可以工作。无法找到何时有效和无效的模式,这是不稳定的。

我在某处读到 Celery 在 flask 的内置服务器上工作得不是很好,所以我切换到 uWSGI,其设置与我在生产中的设置几乎相同:

[uwsgi]
wsgi-file = app/uwsgi.py
callable = application
http = :8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true
buffer-size = 32768
enable-threads = true
req-logger = python:uwsgi

我在 Django 中看到 similar question 问题似乎出在 Apache 的 WSGI Mod 上,这不是我的情况,但行为似乎相似。我看到的所有其他问题都与后端配置错误有关,但我的情况并非如此。

关于可能导致此问题的任何想法? 谢谢

所以我似乎只需要通过我的 Celery 应用程序实例访问 AsyncResult,而不是通过 Celery,或者将 Celery 应用程序实例作为参数传递。

所以,这行不通:

from celery.result import AsyncResult

@app.route('/status/<task_id>')
def get_status(task_id):
    task = AsyncResult(task_id)
    return task.state

这个有效:

from app import my_celery # Your own Celery Application Instance

@app.route('/status/<task_id>')
def get_status(task_id):
    task = my_celery.AsyncResult(task_id)
    return task.state

这也有效:

from app import my_celery
from celery.result import AsyncResult

@app.route('/status/<task_id>')
def get_status(task_id):
    task = AsyncResult(task_id, app=my_celery)
    return task.state

我猜会发生什么情况是,通过直接从 Celery 调用 AsyncResult,它不会访问 Celery 的配置,因此它认为没有配置后端来查询结果。

但这只能解释函数的完全失败,而不是不稳定的行为。我猜这是因为线程不同,以及应用程序实例很重要的情况,所以 Celery 找到了它,但不太确定。

我已经 运行 进行了一些测试,并且在更改导入后似乎再次正常工作 AsyncResult,但我会继续挖掘。