Spring Cloud Kafka Streams 基于Header 信息的动态消息转换

Spring Cloud Kafka Streams Dynamic Message Conversion based on Header info

我正在尝试使用 Spring Cloud Kafka Streams 来处理来自包含不同类型消息的 Kafka 主题的消息。例如,我们收到来自主题的 JSON 消息,它可以是 A 类或 B 类消息。生产者在 header 中添加消息类型,有没有办法在 Functional Binder 中读取 header 信息并相应地转换消息?或者还有一个 "Choice" 选项用于在消息进入时进行分支,以将消息路由到正确的转换器?

如果您将绑定配置为使用 nativeDecoding,则反序列化由 Kafka 完成(通过 value.deserializer 消费者 属性)。

spring-kafka 提供了一个 JsonDeserializer,它在特定的 header 中查找类型信息(由相应的 JsonSerializer.

设置)

它还提供了一个 DelegatingDeserializer,它允许您 select 根据 spring.kafka.serialization.selector header.

中的值使用哪个反序列化器

有关详细信息,请参阅 the Spring for Apache Kafka Reference Manual