根据异常类型调用 ContainerStoppingErrorHandler

Invoke ContainerStoppingErrorHandler based on exception type

我正在使用 spring kafka 2.2.4 版和 Kafka 2.11 版。我正在使用 ContainerStoppingErrorHandler 作为我的错误处理程序。每当出现异常时,都会调用此方法并停止容器。现在我需要根据异常类型停止容器,如果发生某些数据库异常,它应该停止其他异常类型的容器,它应该向组发送电子邮件。下面是我的错误处理代码

public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> messageKafkaListenerContainerFactory() {
//consumer configs...
factory.setErrorHandler(new ContainerStoppingErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {


            if (thrownException instanceof ConnectionException) {

                LOGGER.error("Database exception occured stopping the container");
                super.handle(thrownException, records, consumer, container);

            } else {
                //send email about error without discarding the records
            }

        }
    }
}

我可以根据数据库异常停止容器,但对于其他异常,轮询中的记录(包括错误记录)将被丢弃,因此我丢失了数据。 是否有任何方法可以根据类型处理异常并在数据库异常停止时调用错误处理程序,否则继续而不丢弃剩余的记录。

对于其他例外情况,委托给 SeekToCurrentErrorHandler 这将导致为所有未处理的记录(包括失败的记录)寻找主题,以便在下一次轮询()时重新传送它们。

默认情况下,STCEH 在尝试 10 次后放弃失败的记录,但您可以通过设置 maxAttempts 构造函数参数来更改它。

编辑

factory.setErrorHandler(new ContainerStoppingErrorHandler() {

        private final SeekToCurrentErrorHandler stceh = new SeekToCurrentErrorHandler(...);

        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {


            if (thrownException.getCause() instanceof ConnectionException) {

                LOGGER.error("Database exception occured stopping the container");
                super.handle(thrownException, records, consumer, container);

            } else {
                //send email about error without discarding the records
                this.stceh.handle(thrownException, records, consumer, container);
            }

        }
    }
}