在反应式 Kafka 中暂停 kafka 读取
Pause kafka read in reactive Kafka
我有一个使用反应式 kafka 从 kafka 读取数据的应用程序。下面是从kafka读取的代码:
public Flux<String> readFromKafka() {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.debug("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(s-> processData(s))
.map(ConsumerRecord::value)
.doOnNext(c -> log.debug("successfully consumed c{}", c))
.doOnError(exception -> log.error("Error occurred while processing the message, attempting retry. Error message: {} {}", exception.getMessage(), exception))
.retryWhen(Retry.backoff(Integer.parseInt(retryAttempts), Duration.ofSeconds(Integer.parseInt(retryAttemptsDelay))).transientErrors(true))
.onErrorResume(exception -> {
log.error("Kafka read retries exhausted : {} {}",exception.getMessage(), exception.toString());
return Flux.empty();
});
}
我有一个要求,我需要在预定时间暂停此读取,以便任务处理某些内容并在任务完成后恢复。这是我添加的代码
public void pauseKafkaRead() {
List<TopicPartition> list = kafkaConsumerTemplate.assignment().collectList().block();
for (TopicPartition partition: list){
log.info("Partition: " + partition.toString());
kafkaConsumerTemplate.pause(partition);
}
}
public void resumeKafkaRead() {
List<TopicPartition> list = kafkaConsumerTemplate.assignment().collectList().block();
for (TopicPartition partition: list) {
log.info("Partition: " + partition.toString());
kafkaConsumerTemplate.resume(partition);
}
}
我在开始执行任务时调用 pauseKafkaRead() 方法,并在任务完成后调用 resume。
但是,这似乎并没有暂停阅读。继续从 Kafka 读取数据并进行处理。有人可以帮助我了解我在这里缺少什么吗?
暂停或恢复时的 Kafka 正在为您提供单声道,因此您需要订阅该流才能使其执行。
你可以这样试试
public void pauseKafkaRead() {
kafkaConsumerTemplate.assignment()
.flatMap(o -> kafkaConsumerTemplate.pause(o))
.subscribe();
}
public void resumeKafkaRead() {
kafkaConsumerTemplate.assignment()
.flatMap(o -> kafkaConsumerTemplate.resume(o))
.subscribe();
}
我有一个使用反应式 kafka 从 kafka 读取数据的应用程序。下面是从kafka读取的代码:
public Flux<String> readFromKafka() {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.debug("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(s-> processData(s))
.map(ConsumerRecord::value)
.doOnNext(c -> log.debug("successfully consumed c{}", c))
.doOnError(exception -> log.error("Error occurred while processing the message, attempting retry. Error message: {} {}", exception.getMessage(), exception))
.retryWhen(Retry.backoff(Integer.parseInt(retryAttempts), Duration.ofSeconds(Integer.parseInt(retryAttemptsDelay))).transientErrors(true))
.onErrorResume(exception -> {
log.error("Kafka read retries exhausted : {} {}",exception.getMessage(), exception.toString());
return Flux.empty();
});
}
我有一个要求,我需要在预定时间暂停此读取,以便任务处理某些内容并在任务完成后恢复。这是我添加的代码
public void pauseKafkaRead() {
List<TopicPartition> list = kafkaConsumerTemplate.assignment().collectList().block();
for (TopicPartition partition: list){
log.info("Partition: " + partition.toString());
kafkaConsumerTemplate.pause(partition);
}
}
public void resumeKafkaRead() {
List<TopicPartition> list = kafkaConsumerTemplate.assignment().collectList().block();
for (TopicPartition partition: list) {
log.info("Partition: " + partition.toString());
kafkaConsumerTemplate.resume(partition);
}
}
我在开始执行任务时调用 pauseKafkaRead() 方法,并在任务完成后调用 resume。 但是,这似乎并没有暂停阅读。继续从 Kafka 读取数据并进行处理。有人可以帮助我了解我在这里缺少什么吗?
暂停或恢复时的 Kafka 正在为您提供单声道,因此您需要订阅该流才能使其执行。 你可以这样试试
public void pauseKafkaRead() {
kafkaConsumerTemplate.assignment()
.flatMap(o -> kafkaConsumerTemplate.pause(o))
.subscribe();
}
public void resumeKafkaRead() {
kafkaConsumerTemplate.assignment()
.flatMap(o -> kafkaConsumerTemplate.resume(o))
.subscribe();
}