Spring-Kafka @KafkaHandlers 不消耗各自的 java 对象

Spring-Kafka @KafkaHandlers not consuming respective java Objects

我知道这个问题已经被问过很多次了,但是,我还没有找到解决方案。我可以通过我的@KafkaListener(在 class 级别)使用特定的 Java 对象并且工作得很好,但是,当我尝试从同一个对象使用多个不同的 JSON 对象时主题(我在 class 级别使用@KafkaListener,在方法级别使用@KafkaHandler,每个@KafkaHandler 方法期望不同的对象),它总是生成 LinkedHashMap。我可以解析它并获取数据并执行工厂模式以根据 json 字段生成不同的实例,但是,当 Spring 可以自动检测特定的 @KafkaHandler 时,我不想这样做路由消息。 我如何使用单个 @KafkaListener.

从单个主题中使用不同的 JSON 对象

我正在使用以下配置:

public ConsumerFactory<String, Object> abcConsumerFactory() {       
    Map<String, Object> config = new HashMap<>();       
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class, false)); 

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {      
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(abcConsumerFactory());
    return factory;
    }

如果我在上面的配置中使用实际的 class(Foo 或 Bar)而不是对象,它对特定对象工作正常,但是,当我尝试概括时,它不会转到特定的@ KafkaHandler,而是转到负载类型为 LinkedHashMap 的 @KafkaHandler(我试图检查它是否被传送到 @KafkaHandler)

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {      
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(abcConsumerFactory());
    return factory;
    }

Class:  

    @KafkaListener(topics={"abc_gh"}, containerFactory = "abcListenerContainerFactory")
    @Service
    public class MyListener {

    @KafkaHandler
    public void consumeMessage(@Payload Foo f) {
        //only comes here when i use Foo in my configs instead of Object

    }

    @KafkaHandler
    public void consumeMessage22(@Payload Bar b) {
        //only comes here when i use Bar in my configs instead of Object

    }   

    @KafkaHandler
    public void consumeMessage77(@Payload LinkedHashMap bc) {

        //comes here when i use Object in the configs, even if i expect a Foo or a Bar object

    }               
    }

我想分享的一件事是 Producer 没有使用 Spring-Kafka。

我不知道我错过了什么,我尝试了很多东西但没有运气。

正如the documentation所说:

When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the JsonDeserializer, or the JsonMessageConverter with its TypePrecedence set to TYPE_ID. See Serialization, Deserialization, and Message Conversion for more information.

为了路由到正确的方法,ConsumerRecord 中必须已经有正确的类型。

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );

您没有向反序列化器提供有关 object 要创建的内容的任何信息。

当Spring是生产者时,它在记录header中添加类型信息,反序列化器可以使用这些信息来构造正确的类型。

如果没有类型信息,您将得到 Map

生产者必须设置类型 headers 才能正常工作。

@KaflaListener处于方法级别时,我们可以根据方法参数确定要创建的类型。在 class 级别,我们有一个 catch-22 - 我们不能选择一种方法,除非记录已经被转换。

您的生产者不必知道实际类型,但它至少必须提供一个 header 可用于查找我们需要转换为的类型。

参见 Mapping Types

或者,生产者的 JSON 序列化程序必须配置为将类型信息添加到 JSON 本身。

另一个选项是自定义反序列化器,它 "peeks" 在 JSON 处确定 class 要实例化的内容。