Spring-Kafka @KafkaHandlers 不消耗各自的 java 对象
Spring-Kafka @KafkaHandlers not consuming respective java Objects
我知道这个问题已经被问过很多次了,但是,我还没有找到解决方案。我可以通过我的@KafkaListener(在 class 级别)使用特定的 Java 对象并且工作得很好,但是,当我尝试从同一个对象使用多个不同的 JSON 对象时主题(我在 class 级别使用@KafkaListener,在方法级别使用@KafkaHandler,每个@KafkaHandler 方法期望不同的对象),它总是生成 LinkedHashMap。我可以解析它并获取数据并执行工厂模式以根据 json 字段生成不同的实例,但是,当 Spring 可以自动检测特定的 @KafkaHandler 时,我不想这样做路由消息。
我如何使用单个 @KafkaListener.
从单个主题中使用不同的 JSON 对象
我正在使用以下配置:
public ConsumerFactory<String, Object> abcConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class, false));
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcConsumerFactory());
return factory;
}
如果我在上面的配置中使用实际的 class(Foo 或 Bar)而不是对象,它对特定对象工作正常,但是,当我尝试概括时,它不会转到特定的@ KafkaHandler,而是转到负载类型为 LinkedHashMap 的 @KafkaHandler(我试图检查它是否被传送到 @KafkaHandler)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcConsumerFactory());
return factory;
}
Class:
@KafkaListener(topics={"abc_gh"}, containerFactory = "abcListenerContainerFactory")
@Service
public class MyListener {
@KafkaHandler
public void consumeMessage(@Payload Foo f) {
//only comes here when i use Foo in my configs instead of Object
}
@KafkaHandler
public void consumeMessage22(@Payload Bar b) {
//only comes here when i use Bar in my configs instead of Object
}
@KafkaHandler
public void consumeMessage77(@Payload LinkedHashMap bc) {
//comes here when i use Object in the configs, even if i expect a Foo or a Bar object
}
}
我想分享的一件事是 Producer 没有使用 Spring-Kafka。
我不知道我错过了什么,我尝试了很多东西但没有运气。
正如the documentation所说:
When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the JsonDeserializer
, or the JsonMessageConverter
with its TypePrecedence set to TYPE_ID
. See Serialization, Deserialization, and Message Conversion for more information.
为了路由到正确的方法,ConsumerRecord
中必须已经有正确的类型。
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
您没有向反序列化器提供有关 object 要创建的内容的任何信息。
当Spring是生产者时,它在记录header中添加类型信息,反序列化器可以使用这些信息来构造正确的类型。
如果没有类型信息,您将得到 Map
。
生产者必须设置类型 headers 才能正常工作。
当@KaflaListener
处于方法级别时,我们可以根据方法参数确定要创建的类型。在 class 级别,我们有一个 catch-22 - 我们不能选择一种方法,除非记录已经被转换。
您的生产者不必知道实际类型,但它至少必须提供一个 header 可用于查找我们需要转换为的类型。
参见 Mapping Types。
或者,生产者的 JSON 序列化程序必须配置为将类型信息添加到 JSON 本身。
另一个选项是自定义反序列化器,它 "peeks" 在 JSON 处确定 class 要实例化的内容。
我知道这个问题已经被问过很多次了,但是,我还没有找到解决方案。我可以通过我的@KafkaListener(在 class 级别)使用特定的 Java 对象并且工作得很好,但是,当我尝试从同一个对象使用多个不同的 JSON 对象时主题(我在 class 级别使用@KafkaListener,在方法级别使用@KafkaHandler,每个@KafkaHandler 方法期望不同的对象),它总是生成 LinkedHashMap。我可以解析它并获取数据并执行工厂模式以根据 json 字段生成不同的实例,但是,当 Spring 可以自动检测特定的 @KafkaHandler 时,我不想这样做路由消息。 我如何使用单个 @KafkaListener.
从单个主题中使用不同的 JSON 对象我正在使用以下配置:
public ConsumerFactory<String, Object> abcConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class, false));
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcConsumerFactory());
return factory;
}
如果我在上面的配置中使用实际的 class(Foo 或 Bar)而不是对象,它对特定对象工作正常,但是,当我尝试概括时,它不会转到特定的@ KafkaHandler,而是转到负载类型为 LinkedHashMap 的 @KafkaHandler(我试图检查它是否被传送到 @KafkaHandler)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcConsumerFactory());
return factory;
}
Class:
@KafkaListener(topics={"abc_gh"}, containerFactory = "abcListenerContainerFactory")
@Service
public class MyListener {
@KafkaHandler
public void consumeMessage(@Payload Foo f) {
//only comes here when i use Foo in my configs instead of Object
}
@KafkaHandler
public void consumeMessage22(@Payload Bar b) {
//only comes here when i use Bar in my configs instead of Object
}
@KafkaHandler
public void consumeMessage77(@Payload LinkedHashMap bc) {
//comes here when i use Object in the configs, even if i expect a Foo or a Bar object
}
}
我想分享的一件事是 Producer 没有使用 Spring-Kafka。
我不知道我错过了什么,我尝试了很多东西但没有运气。
正如the documentation所说:
When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the
JsonDeserializer
, or theJsonMessageConverter
with its TypePrecedence set toTYPE_ID
. See Serialization, Deserialization, and Message Conversion for more information.
为了路由到正确的方法,ConsumerRecord
中必须已经有正确的类型。
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
您没有向反序列化器提供有关 object 要创建的内容的任何信息。
当Spring是生产者时,它在记录header中添加类型信息,反序列化器可以使用这些信息来构造正确的类型。
如果没有类型信息,您将得到 Map
。
生产者必须设置类型 headers 才能正常工作。
当@KaflaListener
处于方法级别时,我们可以根据方法参数确定要创建的类型。在 class 级别,我们有一个 catch-22 - 我们不能选择一种方法,除非记录已经被转换。
您的生产者不必知道实际类型,但它至少必须提供一个 header 可用于查找我们需要转换为的类型。
参见 Mapping Types。
或者,生产者的 JSON 序列化程序必须配置为将类型信息添加到 JSON 本身。
另一个选项是自定义反序列化器,它 "peeks" 在 JSON 处确定 class 要实例化的内容。