如何按类型消费来自Kafka的消息

How to consume messages from Kafka by type

我是 kafka 的新手,对此有一些疑问。我已经配置了一个 kafka 消费者来消费来自主题的消息,并且我有不同类型的事件进入主题。 f.e.

class Event {
    String type;
    Object event;
}

我想配置不同的kafka监听器来消费不同类型的事件。 我看到了两种方法,比如使用 String(json) 格式的事件和转换为事件对象并在类型之间切换并执行业务逻辑,或者配置不同的 kafka 侦听器工厂

    public ConcurrentKafkaListenerContainerFactory<String, String> businessEventStreamListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setRecordFilterStrategy(consumerRecord -> !consumerRecord.value().contains("typeOfObjectIWantToConsume"));
        return factory;
    }

所以第一种方法不是 SOLID,对于第二种方法,我需要创建很多工厂 类。有没有办法更优雅地做到这一点?

嗯,这种业务需求确实是我们引入 @KafkaHandler 的原因。

在文档中查看更多信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener