Spring @KafkaListener 和并发

Spring @KafkaListener and concurrency

我正在使用 spring boot + spring @KafkaListener。我期望的行为是:我的 kafka 侦听器在 10 个线程中读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和处理消息。

我定义了

的bean
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory)
{

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setMissingTopicsFatal(false);
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    return factory;
}

和spring启动配置:

spring.kafka.listener.concurrency=10

我看到所有配置都有效,我在 jmx 中看到我的 10 个线程:

但是后来我做了这样的测试:

 @KafkaListener(topics = {
            "${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record)
    {
        if(record.getVersion() < 3) {
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        else
            System.out.println("It works!");

    }

如果版本 < 3,则挂起,否则 - 工作。 我发送了 3 条版本为 1,2 和 3 的消息。我希望版本 1 和版本 2 的消息会挂起,但是版本 3 将在它到达侦听器时被处理。但不幸的是,版本 3 的消息在开始处理之前等待消息 1 和 2。

也许我的期望不是真的,这是kafka listener的正确行为。 请帮我处理kafka并发,为什么会这样?

Kafka 不是这样工作的;您至少需要与消费者一样多的分区(由 spring 容器中的 concurrency 控制)。

此外,一次只有一个消费者(在一个组中)可以从一个分区中消费,因此,即使您增加分区,"stuck" 消费者后面的同一分区中的记录也不会被接收其他消费者。

如果您想对 Kafka 进行故障转移,则必须启动更多应用程序实例。

示例:您有一个名为 test 的主题,有 1 个分区,您将使用相同的 Kafka 组创建 2 个应用程序实例。一个实例将处理您的数据,另一个实例将等待并开始处理消息,以防第一个实例崩溃。如果您有 N 个分区和 N + 1 或 2 或 3 个应用程序实例,则相同。此外,每个实例将只有一个消费者线程。

有关它的更多信息,请在 Google 上搜索:Kafka Consumer Groups