Spring Kafka 监听器执行器
Spring Kafka listenerExecutor
我正在 spring 引导应用程序中设置一个 kafka 侦听器,我似乎无法使用执行程序在池中获取侦听器 运行ning。这是我的卡夫卡配置:
@Bean
ThreadPoolTaskExecutor messageProcessorExecutor() {
logger.info("Creating a message processor pool with {} threads", numThreads);
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(200);
exec.setMaxPoolSize(200);
exec.setKeepAliveSeconds(30);
exec.setAllowCoreThreadTimeOut(true);
exec.setQueueCapacity(0); // Yields a SynchronousQueue
exec.setThreadFactory(ThreadFactoryFactory.defaultNamingFactory("kafka", "processor"));
return exec;
}
@Bean
public ConsumerFactory<String, PollerJob> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
DefaultKafkaConsumerFactory<String, PollerJob> factory = new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(PollerJob.class));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PollerJob> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PollerJob> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.valueOf(kafkaThreads));
factory.getContainerProperties().setListenerTaskExecutor(messageProcessorExecutor());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
ThreadPoolTaskExecutor
使用的 ThreadFactoryFactory
只是确保线程被命名为 'kafka-1-processor-1'
。
ConsumerFactory
将 ENABLE_AUTO_COMMIT_CONFIG
标志设置为 false,我使用手动模式进行确认,这是根据 documentation.[=20 使用执行程序所必需的=]
我的监听器看起来像这样:
@KafkaListener(topics = "my_topic",
group = "my_group",
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload SomeJob job, Acknowledgment ack) {
ack.acknowledge();
logger.info("Running job {}", job.getId());
....
}
使用管理服务器,我可以检查所有线程,并且只创建了一个 kafka-N-processor-N
线程,但我预计最多可以看到 200 个。作业全部 运行 一次一个在那个线程上,我不明白为什么。
如何使用我的执行程序和尽可能多的线程来设置 运行 侦听器?
我正在使用 Spring Boot 1.5.4.RELEASE 和 kafka 0.11.0.0.
如果您的主题只有一个分区,根据消费者组策略,只有一个消费者能够轮询该分区。
ConcurrentMessageListenerContainer
确实创建了与提供的 concurrency
一样多的目标 KafkaMessageListenerContainer
实例。只有在它不知道主题中的分区数时才会这样做。
当consumer group rebalance发生时,只有一个consumer得到partition进行消费。所有的工作实际上都是在一个线程中完成的:
private void startInvoker() {
ListenerConsumer.this.invoker = new ListenerInvoker();
ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
.submit(ListenerConsumer.this.invoker);
}
一个分区 - 一个用于顺序记录处理的线程。
我正在 spring 引导应用程序中设置一个 kafka 侦听器,我似乎无法使用执行程序在池中获取侦听器 运行ning。这是我的卡夫卡配置:
@Bean
ThreadPoolTaskExecutor messageProcessorExecutor() {
logger.info("Creating a message processor pool with {} threads", numThreads);
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(200);
exec.setMaxPoolSize(200);
exec.setKeepAliveSeconds(30);
exec.setAllowCoreThreadTimeOut(true);
exec.setQueueCapacity(0); // Yields a SynchronousQueue
exec.setThreadFactory(ThreadFactoryFactory.defaultNamingFactory("kafka", "processor"));
return exec;
}
@Bean
public ConsumerFactory<String, PollerJob> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
DefaultKafkaConsumerFactory<String, PollerJob> factory = new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(PollerJob.class));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PollerJob> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PollerJob> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.valueOf(kafkaThreads));
factory.getContainerProperties().setListenerTaskExecutor(messageProcessorExecutor());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
ThreadPoolTaskExecutor
使用的 ThreadFactoryFactory
只是确保线程被命名为 'kafka-1-processor-1'
。
ConsumerFactory
将 ENABLE_AUTO_COMMIT_CONFIG
标志设置为 false,我使用手动模式进行确认,这是根据 documentation.[=20 使用执行程序所必需的=]
我的监听器看起来像这样:
@KafkaListener(topics = "my_topic",
group = "my_group",
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload SomeJob job, Acknowledgment ack) {
ack.acknowledge();
logger.info("Running job {}", job.getId());
....
}
使用管理服务器,我可以检查所有线程,并且只创建了一个 kafka-N-processor-N
线程,但我预计最多可以看到 200 个。作业全部 运行 一次一个在那个线程上,我不明白为什么。
如何使用我的执行程序和尽可能多的线程来设置 运行 侦听器?
我正在使用 Spring Boot 1.5.4.RELEASE 和 kafka 0.11.0.0.
如果您的主题只有一个分区,根据消费者组策略,只有一个消费者能够轮询该分区。
ConcurrentMessageListenerContainer
确实创建了与提供的 concurrency
一样多的目标 KafkaMessageListenerContainer
实例。只有在它不知道主题中的分区数时才会这样做。
当consumer group rebalance发生时,只有一个consumer得到partition进行消费。所有的工作实际上都是在一个线程中完成的:
private void startInvoker() {
ListenerConsumer.this.invoker = new ListenerInvoker();
ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
.submit(ListenerConsumer.this.invoker);
}
一个分区 - 一个用于顺序记录处理的线程。