kafka消息的多种实现(java泛型)

Multiple implementations of kafka message (java Generics)

我正在尝试根据事件类型为 kafka 事件创建多个实现 类。

public class KafkaListener {
    @Autowired
    Service service;

    @KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")
    public void consumeSource(Object event) {
        service.process(event);
    }
}

public interface Service<E> {
    void process(E event);
}

public class ServiceImpl1 implements Service<Event1> {
    void process(Event1 event1) {
         // process 
    }
}

public class ServiceImpl2 implements Service<Event2> {
    void process(Event2 event2) {
         // process 
    }
}

//Event1 & Event2 are 2 POJO classes with different inputs

是否可以实现或者我应该创建多个侦听器,每个事件类型一个?

只要事件被 Kafka 反序列化,你就可以使用 class 级别 @KafkaListener 方法级别 @KafkaHandlers.

参见 the documentation

When you use @KafkaListener at the class-level, you must specify @KafkaHandler at the method level. When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true`)
    public void listenDefault(Object object) {
        ...
    }

}

Starting with version 2.1.3, you can designate a @KafkaHandler method as the default method that is invoked if there is no match on other methods. At most, one method can be so designated. When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the JsonDeserializer, or the JsonMessageConverter with its TypePrecedence set to TYPE_ID. See Serialization, Deserialization, and Message Conversion for more information.