Spring Kafka 监听器执行器

Spring Kafka listenerExecutor

我正在 spring 引导应用程序中设置一个 kafka 侦听器,我似乎无法使用执行程序在池中获取侦听器 运行ning。这是我的卡夫卡配置:

@Bean
ThreadPoolTaskExecutor messageProcessorExecutor() {
    logger.info("Creating a message processor pool with {} threads", numThreads);
    ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
    exec.setCorePoolSize(200);
    exec.setMaxPoolSize(200);
    exec.setKeepAliveSeconds(30);
    exec.setAllowCoreThreadTimeOut(true);
    exec.setQueueCapacity(0); // Yields a SynchronousQueue
    exec.setThreadFactory(ThreadFactoryFactory.defaultNamingFactory("kafka", "processor"));
    return exec;
}

@Bean
public ConsumerFactory<String, PollerJob> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    DefaultKafkaConsumerFactory<String, PollerJob> factory = new DefaultKafkaConsumerFactory<>(props,
            new StringDeserializer(),
            new JsonDeserializer<>(PollerJob.class));
    return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, PollerJob> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, PollerJob> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(Integer.valueOf(kafkaThreads));
    factory.getContainerProperties().setListenerTaskExecutor(messageProcessorExecutor());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}

ThreadPoolTaskExecutor 使用的 ThreadFactoryFactory 只是确保线程被命名为 'kafka-1-processor-1'

ConsumerFactoryENABLE_AUTO_COMMIT_CONFIG 标志设置为 false,我使用手动模式进行确认,这是根据 documentation.[=20 使用执行程序所必需的=]

我的监听器看起来像这样:

@KafkaListener(topics = "my_topic",
        group = "my_group",
        containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload SomeJob job, Acknowledgment ack) {
    ack.acknowledge();
    logger.info("Running job {}", job.getId());
    ....
}

使用管理服务器,我可以检查所有线程,并且只创建了一个 kafka-N-processor-N 线程,但我预计最多可以看到 200 个。作业全部 运行 一次一个在那个线程上,我不明白为什么。

如何使用我的执行程序和尽可能多的线程来设置 运行 侦听器?

我正在使用 Spring Boot 1.5.4.RELEASE 和 kafka 0.11.0.0.

如果您的主题只有一个分区,根据消费者组策略,只有一个消费者能够轮询该分区。

ConcurrentMessageListenerContainer 确实创建了与提供的 concurrency 一样多的目标 KafkaMessageListenerContainer 实例。只有在它不知道主题中的分区数时才会这样做。

当consumer group rebalance发生时,只有一个consumer得到partition进行消费。所有的工作实际上都是在一个线程中完成的:

 private void startInvoker() {
    ListenerConsumer.this.invoker = new ListenerInvoker();
    ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
            .submit(ListenerConsumer.this.invoker);
}

一个分区 - 一个用于顺序记录处理的线程。