如何在 reactor-kafka 中重试失败的 ConsumerRecord
How to retry failed ConsumerRecord in reactor-kafka
我正在尝试使用 reactor-kafka 来消费消息。其他一切工作正常,但我想为失败的消息添加重试(2)。 spring-kafka默认已经重试失败记录3次,我想用reactor-kafka实现同样的效果。
我正在使用 spring-kafka 作为 reactive-kafka 的包装器。下面是我的消费者模板:
reactiveKafkaConsumerTemplate
.receiveAutoAck()
.map(ConsumerRecord::value)
.flatMap(this::consumeWithRetry)
.onErrorContinue((error, value)->log.error("something bad happened while consuming : {}", error.getMessage()))
.retryWhen(Retry.backoff(30, Duration.of(10, ChronoUnit.SECONDS)))
.subscribe();
让我们考虑消费方法如下
public Mono<Void> consume(MessageRecord message){
return Mono.error(new RuntimeException("test retry"); //sample error scenario
}
我正在使用以下逻辑在失败时重试消费方法。
public Mono<Void> consumeWithRetry(MessageRecord message){
return consume(message)
.retry(2);
}
如果当前消费者记录因异常而失败,我想重试消费该消息。我试图用另一个 retry(3) 包装 consume 方法,但这没有达到目的。最后一个 retryWhen 仅用于在 kafka rebalances 上重试订阅。
@simon-baslé @gary-russell
之前在重试时我使用了以下方法:
public Mono<Void> consumeWithRetry(MessageRecord message){
return consume(message)
.retry(2);
}
但它没有重试。添加 Mono.defer 后,上面的代码有效并添加了所需的重试。
public Mono<Void> consumeWithRetry(MessageRecord message){
return Mono.defer(()->consume(message))
.retry(2);
}
我正在尝试使用 reactor-kafka 来消费消息。其他一切工作正常,但我想为失败的消息添加重试(2)。 spring-kafka默认已经重试失败记录3次,我想用reactor-kafka实现同样的效果。
我正在使用 spring-kafka 作为 reactive-kafka 的包装器。下面是我的消费者模板:
reactiveKafkaConsumerTemplate
.receiveAutoAck()
.map(ConsumerRecord::value)
.flatMap(this::consumeWithRetry)
.onErrorContinue((error, value)->log.error("something bad happened while consuming : {}", error.getMessage()))
.retryWhen(Retry.backoff(30, Duration.of(10, ChronoUnit.SECONDS)))
.subscribe();
让我们考虑消费方法如下
public Mono<Void> consume(MessageRecord message){
return Mono.error(new RuntimeException("test retry"); //sample error scenario
}
我正在使用以下逻辑在失败时重试消费方法。
public Mono<Void> consumeWithRetry(MessageRecord message){
return consume(message)
.retry(2);
}
如果当前消费者记录因异常而失败,我想重试消费该消息。我试图用另一个 retry(3) 包装 consume 方法,但这没有达到目的。最后一个 retryWhen 仅用于在 kafka rebalances 上重试订阅。
@simon-baslé @gary-russell
之前在重试时我使用了以下方法:
public Mono<Void> consumeWithRetry(MessageRecord message){
return consume(message)
.retry(2);
}
但它没有重试。添加 Mono.defer 后,上面的代码有效并添加了所需的重试。
public Mono<Void> consumeWithRetry(MessageRecord message){
return Mono.defer(()->consume(message))
.retry(2);
}