spring 设置并发后kafka thorws InstanceAlreadyExistsException异常 > 1
spring kafka thorws InstanceAlreadyExistsException exception after setting concurrency > 1
我正在使用spring-kafka,如果我不设置ConcurrentKafkaListenerContainerFactory的并发度,一切正常,当我将其设置为大于1的数字时,出现异常:
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=client-3
我的配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConcurrency(kafkaConfig.getConcurrency());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}
属性:
kafka.enable-auto-commit=false
kafka.client-id=client-1
kafka.concurrency=2
我打开了一个issue for this on github。目前不支持为每个线程设置不同的 client.id
。
作为变通方法,您可以为每个单独启动一个 KafkaMessageListenerContainer
(ConcurrentMessageListenerContainer
在内部执行的操作)。
编辑
虽然不理想,但您可以省略 client.id
并且 kafka 客户端将为每个生成一个(consumer-1
、consumer-2
等)
我正在使用spring-kafka,如果我不设置ConcurrentKafkaListenerContainerFactory的并发度,一切正常,当我将其设置为大于1的数字时,出现异常:
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client-3
我的配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConcurrency(kafkaConfig.getConcurrency());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}
属性:
kafka.enable-auto-commit=false
kafka.client-id=client-1
kafka.concurrency=2
我打开了一个issue for this on github。目前不支持为每个线程设置不同的 client.id
。
作为变通方法,您可以为每个单独启动一个 KafkaMessageListenerContainer
(ConcurrentMessageListenerContainer
在内部执行的操作)。
编辑
虽然不理想,但您可以省略 client.id
并且 kafka 客户端将为每个生成一个(consumer-1
、consumer-2
等)