在反应式 Kafka 中暂停 kafka 读取

Pause kafka read in reactive Kafka

我有一个使用反应式 kafka 从 kafka 读取数据的应用程序。下面是从kafka读取的代码:

public Flux<String> readFromKafka() {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.debug("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .doOnNext(s-> processData(s))
                .map(ConsumerRecord::value)
                .doOnNext(c -> log.debug("successfully consumed c{}", c))
                .doOnError(exception -> log.error("Error occurred while processing the message, attempting retry. Error message: {} {}", exception.getMessage(), exception))
                .retryWhen(Retry.backoff(Integer.parseInt(retryAttempts), Duration.ofSeconds(Integer.parseInt(retryAttemptsDelay))).transientErrors(true))
                .onErrorResume(exception -> {
                    log.error("Kafka read retries exhausted : {} {}",exception.getMessage(), exception.toString());
                    return Flux.empty();
                });
    }

我有一个要求,我需要在预定时间暂停此读取,以便任务处理某些内容并在任务完成后恢复。这是我添加的代码

   public void pauseKafkaRead() {
        List<TopicPartition> list = kafkaConsumerTemplate.assignment().collectList().block();
        for (TopicPartition partition: list){
            log.info("Partition: " + partition.toString());
            kafkaConsumerTemplate.pause(partition);
        }
    }

    public void resumeKafkaRead() {
        List<TopicPartition> list = kafkaConsumerTemplate.assignment().collectList().block();
        for (TopicPartition partition: list) {
            log.info("Partition: " + partition.toString());
            kafkaConsumerTemplate.resume(partition);
        }
    }

我在开始执行任务时调用 pauseKafkaRead() 方法,并在任务完成后调用 resume。 但是,这似乎并没有暂停阅读。继续从 Kafka 读取数据并进行处理。有人可以帮助我了解我在这里缺少什么吗?

暂停或恢复时的 Kafka 正在为您提供单声道,因此您需要订阅该流才能使其执行。 你可以这样试试

  public void pauseKafkaRead() {
    kafkaConsumerTemplate.assignment()
        .flatMap(o -> kafkaConsumerTemplate.pause(o))
        .subscribe();

  }
  public void resumeKafkaRead() {
    kafkaConsumerTemplate.assignment()
        .flatMap(o -> kafkaConsumerTemplate.resume(o))
        .subscribe();
  }