多个消费者使用 spring kafka

Multiple consumers using spring kafka

我希望在我的应用程序中针对 kafka 主题设置多个侦听器。下面是我的设置。它应该被两个组使用,但它只被一个听众使用。我在这里错过了什么?

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<String, Object>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);

    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory(consumerConfigs());
    return consumerFactory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(100);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean("notificationFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> notificationFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(100);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean("insertContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> insertContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(100);

    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", group = "insert_listener", containerFactory = "insertContainerFactory")
public void receiveForInsert(String message) {
    locationProcessor.insertLocationData(message);
}

@KafkaListener(id = "notification_listener", topics = "${kafka.topic.readlocation}", group = "notification_listener",containerFactory="notificationFactory")
public void receiveForNotification(String message) {
    locationProcessor.processNotificationMessage(message);
}

编辑:下面是有效的代码

@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", groupId = "insert_listener")
public void receiveForInsert(String message) {
    locationProcessor.insertLocationData(message);
}  

每个人都需要不同的 group.idgroup 属性 不是 group.id - 请参阅 javadoc。在即将发布的 1.3 版本中,有一个新的 groupId 属性,如果存在,我们也可以将 id 作为一个组使用。

对于早期版本,您需要为每个不同的消费者工厂。