使用kafka lib反序列化PRIMITIVE AVRO KEY

Deserialization PRIMITIVE AVRO KEY with kafka lib

我目前无法在 KSTREAM APP反序列化 avro PRIMITIVE 密钥

使用 avro 模式编码的密钥(在模式注册表中注册),

当我使用 kafka-avro-console-consumer 时,我可以看到密钥正确反序列化

但不可能让它在 KSTREAM 应用程序中运行

密钥的 avro 模式是原始的:

{"type":"string"}

我已经关注了 confluent 的文档

final Serde<V> valueSpecificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
valueSpecificAvroSerde.configure(serdeConfig, false);

final Serdes.StringSerde keySpecificAvroSerde = new Serdes.StringSerde();
keySpecificAvroSerde.configure(serdeConfig, true);

Consumed<String, totoAvro> inputConf = Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde);

final KStream<String, totoAvro> mystream = builder.stream("name topic", inputConf);

mystream.peek((key, value) -> logger.info("topic KEY :" + key))

它对值很有效,但键将是一个字符串,其中包含模式注册表中的字节,而不仅仅是 "reel" 键

https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format

所以字符串键是 /§/./11016015201 ,但我想要卷轴值: 1016015201

如果我打印字符串中的字节,它是 [0x00 0x00 0x00 0x02 0x31 0x14 0x31 0x30 0x31 0x36 0x30 0x31 0x35 0x32 0x30 0x31 ]

更新

它正在运行:

原回答

该功能当前在模式注册项目中不可用。

但是通过实现自定义 SERDE,您可以解决这个问题,

Thiyaga Rajan 提出了一个可行的实施方案