如何按类型消费来自Kafka的消息
How to consume messages from Kafka by type
我是 kafka 的新手,对此有一些疑问。我已经配置了一个 kafka 消费者来消费来自主题的消息,并且我有不同类型的事件进入主题。
f.e.
class Event {
String type;
Object event;
}
我想配置不同的kafka监听器来消费不同类型的事件。
我看到了两种方法,比如使用 String(json) 格式的事件和转换为事件对象并在类型之间切换并执行业务逻辑,或者配置不同的 kafka 侦听器工厂
public ConcurrentKafkaListenerContainerFactory<String, String> businessEventStreamListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(consumerRecord -> !consumerRecord.value().contains("typeOfObjectIWantToConsume"));
return factory;
}
所以第一种方法不是 SOLID
,对于第二种方法,我需要创建很多工厂 类。有没有办法更优雅地做到这一点?
嗯,这种业务需求确实是我们引入 @KafkaHandler
的原因。
在文档中查看更多信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener。
我是 kafka 的新手,对此有一些疑问。我已经配置了一个 kafka 消费者来消费来自主题的消息,并且我有不同类型的事件进入主题。 f.e.
class Event {
String type;
Object event;
}
我想配置不同的kafka监听器来消费不同类型的事件。 我看到了两种方法,比如使用 String(json) 格式的事件和转换为事件对象并在类型之间切换并执行业务逻辑,或者配置不同的 kafka 侦听器工厂
public ConcurrentKafkaListenerContainerFactory<String, String> businessEventStreamListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(consumerRecord -> !consumerRecord.value().contains("typeOfObjectIWantToConsume"));
return factory;
}
所以第一种方法不是 SOLID
,对于第二种方法,我需要创建很多工厂 类。有没有办法更优雅地做到这一点?
嗯,这种业务需求确实是我们引入 @KafkaHandler
的原因。
在文档中查看更多信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener。