使用 onErrorResume 处理使用 Reactor Kafka 发布到 Kafka 的有问题的负载

Using onErrorResume to handle problematic payloads posted to Kafka using Reactor Kafka

我正在使用 reactor kafka 发送 kafka 消息并接收和处理它们。 在接收 kakfa 有效载荷时,我进行了一些反序列化,如果出现异常,我只想记录该有效载荷(通过保存到 mongo ),然后继续接收其他有效载荷。

为此,我使用以下方法 -

@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
   for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
       flux.delayUntil(//some function to do something)
       .doOnNext(r -> r.receiverOffset().acknowledge())
       .onErrorResume(this::handleException()) // here I'll just save to mongo 
       .subscribe();
   }
}


private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
 // save to mongo
 return Flux.empty();
}

这里我希望每当我在接收有效载荷时遇到异常,onErrorResume 应该捕获它并记录到 mongo 然后我应该很好地继续接收更多消息当我发送到 kafka 队列时.但是,我看到异常发生后,即使调用了 onErrorResume 方法,但我无法再处理发送到 Kakfa 主题的消息。 我在这里可能遗漏了什么吗?

如果需要优雅的处理错误,可以在delayUntil里面加上onErrorResume:

flux
    .delayUntil(r -> {
        return process(r)
            .onErrorReturn(e -> saveToMongo(r));
    });
    .doOnNext(r -> r.receiverOffset().acknowledge())
    .subscribe();

响应式运算符将错误视为终止信号,并且,如果您的内部逻辑(在 delayUntil 内)抛出错误,delayUntil 将终止序列,并且 onErrorReturn after delayUntil 不会让它继续处理来自 Kafka 的事件。

正如@bsideup 所提到的,我最终没有从反序列化器中抛出异常,因为 kafka 无法为该记录提交偏移量,并且没有干净的方法来忽略该记录并继续随着记录的进一步消耗,因为我们没有记录的偏移信息(因为它格式错误)。因此,即使我尝试使用反应性错误运算符忽略记录,轮询也会获取相同的记录,然后消费者就会有点卡住