如何在 Spring Reactor Kafka 中创建多个 KafkaReceiver 实例

How to create multiple instances of KafkaReceiver in Spring Reactor Kafka

我有一个反应式 kafka 应用程序,它从一个主题读取数据并写入另一个主题。该主题有多个分区,我想创建与主题中的分区相同数量的消费者(在同一消费者组中)。据我所知 thread .receive() 将只创建一个 KafkaReceiver 实例,该实例将从主题中的所有分区读取。所以我需要多个接收器并行读取不同的分区。

为此,我想出了以下代码:

@Bean
    public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
        ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic))
                .addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
                .addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
    }

    @Bean
    public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
            KafkaProperties properties) {
        Map<String, Object> props = properties.buildProducerProperties();
        return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
    }


public void run(String... args) {

        for(int i = 0; i < topicPartitionsCount ; i++) {
            readWrite(destinationTopic).subscribe();
        }
    }}


public Flux<String> readWrite(String destTopic) {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .doOnNext(consumerRecord -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                .doOnNext(s-> sendToKafka(s,destTopic))
                .map(ConsumerRecord::value)               
                .onErrorContinue((exception,errorConsumer)->{
                    log.error("Error while consuming : {}", exception.getMessage());
                });
    }

public void sendToKafka(ConsumerRecord<String, String> consumerRecord, String destTopic){
   kafkaProducerTemplate.send(destTopic, consumerRecord.key(), transformRecord(consumerRecord))
                    .doOnNext(senderResult -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
                    .doOnSuccess(senderResult -> {
                        log.debug("Sent {} offset : {}", metrics, senderResult.recordMetadata().offset());
                    }
                    .doOnError(exception -> {
                        log.error("Error while sending message to destination topic : {}", exception.getMessage());
                    })
                    .subscribe();
}

当我测试它时它似乎工作正常,创建了多个 Kafka Receiver 实例来并行处理分区。 我的问题是,这是创建多个实例的最有效方法吗?在反应式 Kafka 中还有另一种方法可以做到这一点吗?

你做的是正确的。