默认的kafkaListenerContainerFactory是如何工作的
How does default kafkaListenerContainerFactory work
我有一个 kafka 消费者 java 应用程序。
我正在阅读该应用程序中的两个单独的 kafka 主题。
对于主题1,有一个名为kafkaListenerContainerFactory 的KafkaListenerContainerFactory,下面是代码片段。该消息采用 avro 格式。 Pojo1 是 pojo class
使用 avro 架构构建。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
为了消费来自主题 1 的消息,我们有以下方法
@kafkaListener (topics="topic1") public boolean readTopic1(ConsumerRecord<String,Pojo1> record){ // logic }
现在这个应用程序也在读取来自 topic2 的消息,它也有 avro 消息,Pojo2 是使用各自的 avro 模式构建的 pojo class。
要阅读topic2,我们有以下方法
@kafkaListener (topics="topic2") public boolean readTopic2(ConsumerRecord<String,Pojo2> record){ // logic }
现在我无法了解来自 topic2 的消费消息如何按预期工作。
kafkaListenerContainerFactory 是用 Pojo1 配置的,那么如何从 Pojo2 的 topic2 读取消息。
据我所知,只有在缺少名为“kafkaListenerContainerFactory”的 bean 时,kafka 才会创建默认容器工厂,但在我的应用程序中
我们已经为 topic1 构建了一个 kafkaListenerContainerFactory。
根据Pojo2应该有另外一个KafkaListenerContainerFactory创建,使用时应该给出参考@kafkaListener (topics="topic2", containerFactory="factoryForTopic2")
谁能帮助我了解 KafkaListenerContainerFactory 如何针对具有不同消息模式的不同主题工作。
这叫做类型擦除;通用类型仅在编译时适用。在运行时,只要您的反序列化器创建正确的类型,它就会工作。
为了避免混淆,使用 ConcurrentKafkaListenerContainerFactory<String, Object>
可能更简洁;特别是在使用可以创建不同类型的 avro 或 json 反序列化器时。
也就是说,您的 bean 名为 importDecisionMessageKafkaListenerContainerFactory
。对于使用非标准工厂的监听器,它的名称必须在监听器的 containerFactory
属性 中指定;否则使用 kafkaListenerContainerFactory
。
这仅适用于您的情况,因为反序列化器能够将 String 反序列化为两个对象。但是假设您使用 Avro,并且您仍然同时拥有 Pojo1 和 Pojo2。在这种情况下,您将需要两个容器工厂。如果您不为 Pojo2 提供一个异常,将抛出一个异常 - MessageConversionException: cannot convert String to Pojo2 for GenericMessage
。解决方案是定义 @kafkaListener (topics="topic2", containerFactory="factoryForTopic2")
我有一个 kafka 消费者 java 应用程序。 我正在阅读该应用程序中的两个单独的 kafka 主题。
对于主题1,有一个名为kafkaListenerContainerFactory 的KafkaListenerContainerFactory,下面是代码片段。该消息采用 avro 格式。 Pojo1 是 pojo class 使用 avro 架构构建。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Pojo1> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Pojo1> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
为了消费来自主题 1 的消息,我们有以下方法
@kafkaListener (topics="topic1") public boolean readTopic1(ConsumerRecord<String,Pojo1> record){ // logic }
现在这个应用程序也在读取来自 topic2 的消息,它也有 avro 消息,Pojo2 是使用各自的 avro 模式构建的 pojo class。
要阅读topic2,我们有以下方法
@kafkaListener (topics="topic2") public boolean readTopic2(ConsumerRecord<String,Pojo2> record){ // logic }
现在我无法了解来自 topic2 的消费消息如何按预期工作。
kafkaListenerContainerFactory 是用 Pojo1 配置的,那么如何从 Pojo2 的 topic2 读取消息。 据我所知,只有在缺少名为“kafkaListenerContainerFactory”的 bean 时,kafka 才会创建默认容器工厂,但在我的应用程序中 我们已经为 topic1 构建了一个 kafkaListenerContainerFactory。
根据Pojo2应该有另外一个KafkaListenerContainerFactory创建,使用时应该给出参考@kafkaListener (topics="topic2", containerFactory="factoryForTopic2")
谁能帮助我了解 KafkaListenerContainerFactory 如何针对具有不同消息模式的不同主题工作。
这叫做类型擦除;通用类型仅在编译时适用。在运行时,只要您的反序列化器创建正确的类型,它就会工作。
为了避免混淆,使用 ConcurrentKafkaListenerContainerFactory<String, Object>
可能更简洁;特别是在使用可以创建不同类型的 avro 或 json 反序列化器时。
也就是说,您的 bean 名为 importDecisionMessageKafkaListenerContainerFactory
。对于使用非标准工厂的监听器,它的名称必须在监听器的 containerFactory
属性 中指定;否则使用 kafkaListenerContainerFactory
。
这仅适用于您的情况,因为反序列化器能够将 String 反序列化为两个对象。但是假设您使用 Avro,并且您仍然同时拥有 Pojo1 和 Pojo2。在这种情况下,您将需要两个容器工厂。如果您不为 Pojo2 提供一个异常,将抛出一个异常 - MessageConversionException: cannot convert String to Pojo2 for GenericMessage
。解决方案是定义 @kafkaListener (topics="topic2", containerFactory="factoryForTopic2")