postgres 数据库中的 celery 任务结果是字节格式的吗?

celery task result in postgres database is in byte format?

我是 Celery 的新手。我尝试分别使用 redis 和 postgres db 作为我的结果后端。而对于 redis 后端,结果检索成功,对于 postgres 后端,检索的结果似乎是字节格式。我想知道我是否遗漏了 code/setup 中的任何内容,或者这是预期的?

我在 docker 容器中的笔记本电脑上启动了 redis 和 postgres,如下所示:

docker run -d --name redis_broker -p 6379:6379 redis:latest
docker run -d --name pg_backend -p 5432:5432 -e POSTGRES_PASSWORD=123 postgres:latest

然后我写了下面很简单的python代码:

tasks.py

from celery import Celery

app = Celery('tasks',
             #backend='redis://localhost:6379/0',
             backend='db+postgresql://postgres:123@localhost:5432/celery',
             broker='redis://localhost:6379/0')

@app.task(name='tasks.add')
def add(x:int, y:int) -> int:
    z = x+y
    return z

caller.py

from tasks import add

result = add.delay(3,2)
print(result.get())

接下来,我在我的终端中启动我的 celery worker,如下所示:

celery -A tasks workder --loglevel=INFO

在另一个终端window,我运行我的caller.py如下:

python caller.py

caller.py完成后,终端打印出5,正确

然后我尝试从我的结果后端检索 python 中的结果。而对于 redis 后端,我得到了正确的结果:

>>> import redis
>>> backend = redis.Redis(host='localhost', port=6379)
>>> backend.get('celery-task-meta-1523014d-0ea7-42c2-83b9-272bcdb72891')
b'{"status": "SUCCESS", "result": 5, "traceback": null, "children": [], "date_done": "2021-11-13T07:23:55.012370", "task_id": "1523014d-0ea7-42c2-83b9-272bcdb72891"}'

对于 postgres 后端,我得到以下信息:

>>> import sqlalchemy
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql+psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> cursor.fetchall()
(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]

我也试过在 postgres 中查询

celery=> select id, task_id, result, status from celery_taskmeta;
 id |               task_id                |              result              | status
----+--------------------------------------+----------------------------------+---------
  1 | eb21609d-0f84-47c7-afb4-9bccbe41eda4 | \x80054b052e                     | SUCCESS
(1 rows)

celery=>

在这种情况下,如果我的应用程序的某些其他组件需要从我的(postgres)结果后端检索任务结果,我该如何读取这个“内存对象”?

我想知道我是否遗漏了我的设置或代码中的任何内容?

提前感谢您的任何建议!

嗯,我是通过查看Celery的源代码自己弄明白的(https://github.com/celery/celery/blob/master/celery/backends/database/models.py) 在Celery源码中,对于使用SQLAlchemy的数据库后端,result是序列化的,其类型是:PickleType。因此,我的问题的答案很简单!只需通过调用 pickle.loads() 来简单地反序列化它,如下所示:

>>> import sqlalchemy, pickle
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql+psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> res = cursor.fetchall()
>>> res
[(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]
>>> pickle.loads(res[0][3])
5

编辑:上面的示例代码仅用于故障排除。在生产环境中,应改用ORM(对象关系映射)。在这种情况下,serialization/deserialization 由 SQLAlchemy 自动处理,不需要显式调用 pickle 方法。