如何用Spring-Kafka实现消费者线程安全

How to implement consumer Thread Safety with Spring-Kafka

我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.7.RELEASE。我正在使用 @KafkaListener 注释来创建消费者我正在为消费者使用所有默认设置。

这是我的消费者配置:

@Configuration
@EnableKafka
public class KafkaConsumerCommonConfig implements KafkaListenerConfigurer {

      @Bean
      public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(primaryConsumerFactory());
        factory.getContainerProperties().setMissingTopicsFatal(false);
        return factory;
      }

      @Bean
      public DefaultKafkaConsumerFactory<Object, Object> primaryConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(sapphireKafkaConsumerConfig.getConfigs());
      }

}

由于某些原因,我在同一个应用程序中有多个消费者,如下所示。

@KafkaListener(topics = "TEST_TOPIC1")
public void consumer1(){

}

@KafkaListener(topics = "TEST_TOPIC2")
public void consumer2(){

}

@KafkaListener(topics = "TEST_TOPIC3")
public void consumer3(){

}

话虽如此,根据 'Consumer Thread Safety'

上的汇合文档

You can’t have multiple consumers that belong to the same group in one thread and you can’t have multiple threads safely use the same consumer. One consumer per thread is the rule. To run mul‐ tiple consumers in the same group in one application, you will need to run each in its own thread. It is useful to wrap the con‐ sumer logic in its own object and then use Java’s ExecutorService to start multiple threads each with its own consumer.

现在我的问题是,在使用上述代码并使用 spring-kafka 来解决这种情况时,我是否应该做任何额外的事情,或者没关系,因为该组将随机生成,因为我没有指定?请提出建议。

您的所有听众都针对不同的主题进行了配置。所以,他们的团队完全无所谓。当同一主题有多个消费者时,该组就会出现。