Spring Cloud Kafka Streams 基于Header 信息的动态消息转换
Spring Cloud Kafka Streams Dynamic Message Conversion based on Header info
我正在尝试使用 Spring Cloud Kafka Streams 来处理来自包含不同类型消息的 Kafka 主题的消息。例如,我们收到来自主题的 JSON 消息,它可以是 A 类或 B 类消息。生产者在 header 中添加消息类型,有没有办法在 Functional Binder 中读取 header 信息并相应地转换消息?或者还有一个 "Choice" 选项用于在消息进入时进行分支,以将消息路由到正确的转换器?
如果您将绑定配置为使用 nativeDecoding
,则反序列化由 Kafka 完成(通过 value.deserializer
消费者 属性)。
spring-kafka 提供了一个 JsonDeserializer
,它在特定的 header 中查找类型信息(由相应的 JsonSerializer
.
设置)
它还提供了一个 DelegatingDeserializer
,它允许您 select 根据 spring.kafka.serialization.selector
header.
中的值使用哪个反序列化器
有关详细信息,请参阅 the Spring for Apache Kafka Reference Manual。
我正在尝试使用 Spring Cloud Kafka Streams 来处理来自包含不同类型消息的 Kafka 主题的消息。例如,我们收到来自主题的 JSON 消息,它可以是 A 类或 B 类消息。生产者在 header 中添加消息类型,有没有办法在 Functional Binder 中读取 header 信息并相应地转换消息?或者还有一个 "Choice" 选项用于在消息进入时进行分支,以将消息路由到正确的转换器?
如果您将绑定配置为使用 nativeDecoding
,则反序列化由 Kafka 完成(通过 value.deserializer
消费者 属性)。
spring-kafka 提供了一个 JsonDeserializer
,它在特定的 header 中查找类型信息(由相应的 JsonSerializer
.
它还提供了一个 DelegatingDeserializer
,它允许您 select 根据 spring.kafka.serialization.selector
header.
有关详细信息,请参阅 the Spring for Apache Kafka Reference Manual。