区分二进制编码的 Avro 和 JSON 消息

Differentiating between binary encoded Avro and JSON messages

我正在使用 python 阅读来自不同主题的消息。一些主题已将其消息编码为纯 JSON,而其他主题则使用 Avro 二进制序列化和融合模式注册表。

当我收到一条消息时,我需要知道它是否需要解码。目前我只依赖于二进制编码消息以 MAGIC_BYTE 开头的事实,其值为零:

from confluent_kafka.cimpl import Consumer

consumer = Consumer(config)
consumer.subsrcibe(...)
msg = consumer.poll()
# check the msg is not null or error etc
if msg.values()[0] == 0:
      # It is binary encoded
else:
      # It is json

我想知道是否有更好的方法?

您可以先通过 REST 简单地查询架构注册表,然后为在此处注册的主题构建本地缓存。然后,当您尝试解码来自特定主题的消息时,只需将该主题与该列表的内容进行比较即可。如果它在那里,你就知道它已经被解码了。

当然,这只有在 Avro 编码的所有主题都使用 Schema Registry 的情况下才有效。如果您曾经收到过 在 Schema Registry 注册的 Avro 编码消息,那么它将无法工作。

您可以获得消息的 0-5 个字节,然后

magic_byte = message_bytes[0]
schema_id = message_bytes[1:5]

然后,针对 GET /schemas/{schema_id} 对您的注册表执行查找,并在您获得 200 响应代码时缓存 ID + 架构(如果需要)。

否则,消息是 JSON,或者生产者已将其数据发送到不同的注册表(如果您的环境中有多个注册表)。 注意:这意味着数据可能仍然是 Avro