区分二进制编码的 Avro 和 JSON 消息
Differentiating between binary encoded Avro and JSON messages
我正在使用 python 阅读来自不同主题的消息。一些主题已将其消息编码为纯 JSON,而其他主题则使用 Avro 二进制序列化和融合模式注册表。
当我收到一条消息时,我需要知道它是否需要解码。目前我只依赖于二进制编码消息以 MAGIC_BYTE
开头的事实,其值为零:
from confluent_kafka.cimpl import Consumer
consumer = Consumer(config)
consumer.subsrcibe(...)
msg = consumer.poll()
# check the msg is not null or error etc
if msg.values()[0] == 0:
# It is binary encoded
else:
# It is json
我想知道是否有更好的方法?
您可以先通过 REST 简单地查询架构注册表,然后为在此处注册的主题构建本地缓存。然后,当您尝试解码来自特定主题的消息时,只需将该主题与该列表的内容进行比较即可。如果它在那里,你就知道它已经被解码了。
当然,这只有在 Avro 编码的所有主题都使用 Schema Registry 的情况下才有效。如果您曾经收到过 未 在 Schema Registry 注册的 Avro 编码消息,那么它将无法工作。
您可以获得消息的 0-5
个字节,然后
magic_byte = message_bytes[0]
schema_id = message_bytes[1:5]
然后,针对 GET /schemas/{schema_id}
对您的注册表执行查找,并在您获得 200
响应代码时缓存 ID + 架构(如果需要)。
否则,消息是 JSON,或者生产者已将其数据发送到不同的注册表(如果您的环境中有多个注册表)。 注意:这意味着数据可能仍然是 Avro
我正在使用 python 阅读来自不同主题的消息。一些主题已将其消息编码为纯 JSON,而其他主题则使用 Avro 二进制序列化和融合模式注册表。
当我收到一条消息时,我需要知道它是否需要解码。目前我只依赖于二进制编码消息以 MAGIC_BYTE
开头的事实,其值为零:
from confluent_kafka.cimpl import Consumer
consumer = Consumer(config)
consumer.subsrcibe(...)
msg = consumer.poll()
# check the msg is not null or error etc
if msg.values()[0] == 0:
# It is binary encoded
else:
# It is json
我想知道是否有更好的方法?
您可以先通过 REST 简单地查询架构注册表,然后为在此处注册的主题构建本地缓存。然后,当您尝试解码来自特定主题的消息时,只需将该主题与该列表的内容进行比较即可。如果它在那里,你就知道它已经被解码了。
当然,这只有在 Avro 编码的所有主题都使用 Schema Registry 的情况下才有效。如果您曾经收到过 未 在 Schema Registry 注册的 Avro 编码消息,那么它将无法工作。
您可以获得消息的 0-5
个字节,然后
magic_byte = message_bytes[0]
schema_id = message_bytes[1:5]
然后,针对 GET /schemas/{schema_id}
对您的注册表执行查找,并在您获得 200
响应代码时缓存 ID + 架构(如果需要)。
否则,消息是 JSON,或者生产者已将其数据发送到不同的注册表(如果您的环境中有多个注册表)。 注意:这意味着数据可能仍然是 Avro