spring 设置并发后kafka thorws InstanceAlreadyExistsException异常 > 1

spring kafka thorws InstanceAlreadyExistsException exception after setting concurrency > 1

我正在使用spring-kafka,如果我不设置ConcurrentKafkaListenerContainerFactory的并发度,一切正常,当我将其设置为大于1的数字时,出现异常:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client-3

我的配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
    ConcurrentKafkaListenerContainerFactory<String, String>();

    factory.setConcurrency(kafkaConfig.getConcurrency());

    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

属性:

kafka.enable-auto-commit=false

kafka.client-id=client-1

kafka.concurrency=2

我打开了一个issue for this on github。目前不支持为每个线程设置不同的 client.id

作为变通方法,您可以为每个单独启动一个 KafkaMessageListenerContainerConcurrentMessageListenerContainer 在内部执行的操作)。

编辑

虽然不理想,但您可以省略 client.id 并且 kafka 客户端将为每个生成一个(consumer-1consumer-2 等)