DisabledBackend:Celery、Redis 和 Flask 的不稳定行为
DisabledBackend: Erratic Behavior with Celery, Redis & Flask
我已经使用 Celery 一段时间了,在生产中,我使用 RabbitMQ 作为代理,使用 Redis 作为 K8s 集群中的后端,到目前为止没有任何问题。在本地,我 运行 docker 与一些服务(Flask API、2 个不同的 Worker、Beat、Redis、Flower、Hasura)组合,同时使用 Redis 作为代理和后端。
过去几个月我没有遇到此设置的问题,但昨天我在访问任务结果时开始出现不稳定的行为。
任务被发送到队列,工作人员识别它并执行任务,但在查询任务状态时我有时会得到 DisabledBackend
。通常在第一次请求时,然后它就可以工作。无法找到何时有效和无效的模式,这是不稳定的。
我在某处读到 Celery 在 flask 的内置服务器上工作得不是很好,所以我切换到 uWSGI,其设置与我在生产中的设置几乎相同:
[uwsgi]
wsgi-file = app/uwsgi.py
callable = application
http = :8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true
buffer-size = 32768
enable-threads = true
req-logger = python:uwsgi
我在 Django 中看到 similar question 问题似乎出在 Apache 的 WSGI Mod 上,这不是我的情况,但行为似乎相似。我看到的所有其他问题都与后端配置错误有关,但我的情况并非如此。
关于可能导致此问题的任何想法?
谢谢
所以我似乎只需要通过我的 Celery 应用程序实例访问 AsyncResult
,而不是通过 Celery,或者将 Celery 应用程序实例作为参数传递。
所以,这行不通:
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id)
return task.state
这个有效:
from app import my_celery # Your own Celery Application Instance
@app.route('/status/<task_id>')
def get_status(task_id):
task = my_celery.AsyncResult(task_id)
return task.state
这也有效:
from app import my_celery
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id, app=my_celery)
return task.state
我猜会发生什么情况是,通过直接从 Celery 调用 AsyncResult
,它不会访问 Celery 的配置,因此它认为没有配置后端来查询结果。
但这只能解释函数的完全失败,而不是不稳定的行为。我猜这是因为线程不同,以及应用程序实例很重要的情况,所以 Celery 找到了它,但不太确定。
我已经 运行 进行了一些测试,并且在更改导入后似乎再次正常工作 AsyncResult
,但我会继续挖掘。
我已经使用 Celery 一段时间了,在生产中,我使用 RabbitMQ 作为代理,使用 Redis 作为 K8s 集群中的后端,到目前为止没有任何问题。在本地,我 运行 docker 与一些服务(Flask API、2 个不同的 Worker、Beat、Redis、Flower、Hasura)组合,同时使用 Redis 作为代理和后端。
过去几个月我没有遇到此设置的问题,但昨天我在访问任务结果时开始出现不稳定的行为。
任务被发送到队列,工作人员识别它并执行任务,但在查询任务状态时我有时会得到 DisabledBackend
。通常在第一次请求时,然后它就可以工作。无法找到何时有效和无效的模式,这是不稳定的。
我在某处读到 Celery 在 flask 的内置服务器上工作得不是很好,所以我切换到 uWSGI,其设置与我在生产中的设置几乎相同:
[uwsgi]
wsgi-file = app/uwsgi.py
callable = application
http = :8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true
buffer-size = 32768
enable-threads = true
req-logger = python:uwsgi
我在 Django 中看到 similar question 问题似乎出在 Apache 的 WSGI Mod 上,这不是我的情况,但行为似乎相似。我看到的所有其他问题都与后端配置错误有关,但我的情况并非如此。
关于可能导致此问题的任何想法? 谢谢
所以我似乎只需要通过我的 Celery 应用程序实例访问 AsyncResult
,而不是通过 Celery,或者将 Celery 应用程序实例作为参数传递。
所以,这行不通:
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id)
return task.state
这个有效:
from app import my_celery # Your own Celery Application Instance
@app.route('/status/<task_id>')
def get_status(task_id):
task = my_celery.AsyncResult(task_id)
return task.state
这也有效:
from app import my_celery
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id, app=my_celery)
return task.state
我猜会发生什么情况是,通过直接从 Celery 调用 AsyncResult
,它不会访问 Celery 的配置,因此它认为没有配置后端来查询结果。
但这只能解释函数的完全失败,而不是不稳定的行为。我猜这是因为线程不同,以及应用程序实例很重要的情况,所以 Celery 找到了它,但不太确定。
我已经 运行 进行了一些测试,并且在更改导入后似乎再次正常工作 AsyncResult
,但我会继续挖掘。