如何配置特定的序列化方法以仅用于 Celery ping?

How can I configure a specific serialization method to use only for Celery ping?

我有一个 celery 应用程序,它必须被另一个应用程序 ping。这个其他应用程序使用 json 序列化芹菜任务参数,但我的应用程序有一个自定义序列化协议。当其他应用程序尝试 ping 我的应用程序 (app.control.ping) 时,它会抛出以下错误:

 "Celery ping failed: Refusing to deserialize untrusted content of type application/x-stjson (application/x-stjson)"

我的整个代码库都依赖于这种自定义编码,所以我想知道是否有一种方法可以配置 json 序列化但仅针对此 ping,并继续使用自定义编码来完成其他任务。

这些是相关的芹菜设置:

accept_content = [CUSTOM_CELERY_SERIALIZATION, "json"]
result_accept_content = [CUSTOM_CELERY_SERIALIZATION, "json"]
result_serializer = CUSTOM_CELERY_SERIALIZATION
task_serializer = CUSTOM_CELERY_SERIALIZATION
event_serializer = CUSTOM_CELERY_SERIALIZATION

将最后 3 个中的任何一个更改为 [CUSTOM_CELERY_SERIALIZATION、“json”] 会导致应用程序崩溃,因此这不是一个选项。

规格:芹菜=5.1.2 python:3.8 OS: Linux docker 容器

如有任何帮助,我们将不胜感激。

Changing any of the last 3 to [CUSTOM_CELERY_SERIALIZATION, "json"] causes the app to crash, so that's not an option.

因为result_serializer, task_serializer, and event_serializer doesn't accept list but just a single str value, unlike e.g. accept_content

  • 列表例如accept_content 是可能的,因为如果有 2 个项目,我们可以检查传入请求的类型是否是这 2 个项目之一。这是不可能的,例如result_serializer因为如果有2个项目,那么task-A的结果应该选什么? (因此需要单个值)
  • 这意味着如果您设置 result_serializer = 'json',这将具有全局效果,其中所有任务的所有结果(任务的返回值可以通过调用例如 response.get() 检索)使用 json-serializer 将是 serialized/deserialized。因此,它可能适用于 ping 但它可能不适用于不能直接 serialized/deserialized to/from JSON 的任务,它确实需要自定义 stjson-serializer.

目前使用 Celery==5.1.2,result_serializer 的特定任务设置似乎是不可能的,因此我们无法设置要在 'json' 中编码的单个任务而不是 'stjson' 没有为所有人设置 globally,我假设同样的情况适用于 ping。

这不是最好的解决方案,但解决方法是,与其在您的应用程序端修复它,不如选择仅在其他应用程序中添加对 serialize/deserialize 类型 'application/x-stjson' 内容的支持。

other_app/celery.py

import ast

from celery import Celery
from kombu.serialization import register


# This is just a possible implementation. Replace with the actual serializer/deserializer for stjson in your app.
def stjson_encoder(obj):
    return str(obj)
def stjson_decoder(obj):
    obj = ast.literal_eval(obj)
    return obj

register(
    'stjson',
    stjson_encoder,
    stjson_decoder,
    content_type='application/x-stjson',
    content_encoding='utf-8',
)

app = Celery('other_app')

app.conf.update(
    accept_content=['json', 'stjson'],
)
  • 你的应用仍然接受和响应stjson格式,但现在另一个应用配置为能够解析这种格式。