Spring Cloud Stream 中 Kafka headers 上的条件 (content-based) 路由
Conditional (content-based) routing on Kafka headers in Spring Cloud Stream
我在 Spring Boot 2.x 应用程序中使用 Spring Cloud Stream 3.x 来使用来自 Kafka 主题的消息。
我想要一个监听器,根据 doc:
根据某些自定义 header 值有条件地使用消息
@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
LOGGER.info("Received: {}", message);
}
但是侦听器永远不会收到通知,如果条件被删除,我会在日志中看到以下内容:
Received: ... SomeHeader: [B@1055e4af ...
事实证明,自定义 headers 保留在 Kafka 字节数组原始格式中,使它们不符合条件评估条件。
是否需要一些额外的配置或者我是否遗漏了什么?
在对资源和 stackoveflow 进行一些挖掘之后,我发现了以下内容:
- Spring Cloud Stream 委托给 Spring Kafka 消息和 headers 转换 (KafkaMessageChannelBinder ~ getHeaderMapper)
- Headers 默认保留原始格式 headers 转换实现 (BinderHeaderMapper)
- Spring Cloud Stream 允许自定义 headers 映射,特别是将 headers 从字节数组转换为字符串 (How can I map incoming headers as String instead of byte[] in my Spring Cloud Stream project?)
所以我添加了我的自定义 header 映射器 bean(bean 名称很重要,它允许省略其他配置 属性),它将我的自定义 header 映射到 String:
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setRawMappedHeaders(Map.of(
"SomeHeader", true
));
return headerMapper;
}
解决了问题:
Received: ... SomeHeader: SomeHeaderValue ...
P.S。这似乎是 Spring Cloud Stream:
中的错误
- 它引入了自己的 header 映射器(BinderHeaderMapper)实现,但后者不支持条件路由功能。
- Header 映射器是 KafkaMessageChannelBinder 的子类,此添加的行为是 non-obvious,如果提供自定义 header 映射器将丢失。
我在 Spring Boot 2.x 应用程序中使用 Spring Cloud Stream 3.x 来使用来自 Kafka 主题的消息。
我想要一个监听器,根据 doc:
根据某些自定义 header 值有条件地使用消息@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
LOGGER.info("Received: {}", message);
}
但是侦听器永远不会收到通知,如果条件被删除,我会在日志中看到以下内容:
Received: ... SomeHeader: [B@1055e4af ...
事实证明,自定义 headers 保留在 Kafka 字节数组原始格式中,使它们不符合条件评估条件。
是否需要一些额外的配置或者我是否遗漏了什么?
在对资源和 stackoveflow 进行一些挖掘之后,我发现了以下内容:
- Spring Cloud Stream 委托给 Spring Kafka 消息和 headers 转换 (KafkaMessageChannelBinder ~ getHeaderMapper)
- Headers 默认保留原始格式 headers 转换实现 (BinderHeaderMapper)
- Spring Cloud Stream 允许自定义 headers 映射,特别是将 headers 从字节数组转换为字符串 (How can I map incoming headers as String instead of byte[] in my Spring Cloud Stream project?)
所以我添加了我的自定义 header 映射器 bean(bean 名称很重要,它允许省略其他配置 属性),它将我的自定义 header 映射到 String:
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setRawMappedHeaders(Map.of(
"SomeHeader", true
));
return headerMapper;
}
解决了问题:
Received: ... SomeHeader: SomeHeaderValue ...
P.S。这似乎是 Spring Cloud Stream:
中的错误- 它引入了自己的 header 映射器(BinderHeaderMapper)实现,但后者不支持条件路由功能。
- Header 映射器是 KafkaMessageChannelBinder 的子类,此添加的行为是 non-obvious,如果提供自定义 header 映射器将丢失。