反序列化异常后继续在reactor kafka中消费后续记录

Continue consuming subsequent records in reactor kafka after deserialization exception

我正在使用 reactor kafka 并有一个自定义的 AvroDeserializer class 用于消息的反序列化。

现在我有一个案例,对于某些有效负载,反序列化 class 抛出异常。 我的 Kafka 侦听器在尝试读取此类记录时立即死亡。 我尝试使用 onErrorReturn 并使用(doOnErroronErrorContinue 的组合来处理此异常,但是,它有助于记录异常,但未能使用后续记录。

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
   public T deserialize( String topic, byte[] data ) {
       try {
         //logic to deserialize
       }
       catch {
          // throw new SerializationException("error deserializing" );
       }
   }
}

在听众端,我试图这样处理 ->

@EventListener(ApplicationStartedEvent.class)

public class listener() {
   KakfaReceiver<String, Object> receiver; // kafka listener
   receiver.receive()
   .delayUntil(do something)//logic to update the record to db
   .doOnError(handle error)//trying to handle the exceptions including desrialization exception - logging them to a system
   .onErrorContinue((throwable, o) -> log.info("continuing"))
   .doOnNext(r->r.receiverOffset().acknowledge())
   .subscribe()

}

一个选项是不从反序列化器中抛出异常class,但我想在单独的系统中记录此类异常以供分析,因此希望在 kafka 侦听器端处理此类记录。 关于如何实现这一点有什么想法吗?

看起来像评论中关于使用

的建议
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

大多数情况下都可以。但是我想不出让它在 Reactor Spring Kafka 中工作的方法。现在,我继续采用不从反序列化器抛出异常并添加逻辑将其记录在那里的方法,这解决了 Kafka 消费者无法在毒记录 [=11 上消费后续记录的问题=]

我认为你的问题是你确实抛出了一个异常,即使在反应流的上下文中,任何异常都是一个终端。只是不要抛出异常。

将你的 T(你从 deserialize() return)包装在 class 中,这可以指示反序列化是成功还是失败。如果您不关心确切的错误,您可以使用 Optional,其中空可选将指示反序列化错误。如果您使用的是 vavr 库,您可以从那里获取 Option 或 Try 以在失败端存储实际的反序列化。

例如,您的反序列化器看起来像

public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
   public Optional<T> deserialize( String topic, byte[] data ) {
       try {
         //logic to deserialize
         return Optional.of(result);
       }
       catch {
          return Optional.empty();
       }
   }
}

现在你不再抛出,你的通量保持活跃,你可以过滤掉空的 Optional 实例。

P.S。 如果不想使用包装器类型,可以 return null 而不是抛出并过滤掉空值。不过我不建议这样做 - 更容易忘记 deserialize() 行为并稍后获得 NullPointerException。