我可以使用 spring-kafka RecoveringBatchErrorHandler 来处理反序列化异常吗?

Can I use a spring-kafka RecoveringBatchErrorHandler to handle deserialization exceptions?

我想要一个批处理侦听器,它在失败记录之前提交偏移量,记录失败记录,然后从失败记录之后的第一个偏移量开始检索新批次。

我当前的方法是通过抛出一个 BatchListenerFailedException 来处理在我的侦听器代码中抛出的异常,该 BatchListenerFailedExceptionRecoveringBatchErrorHandler 按我的意图处理。但是,我想以这种方式处理所有异常;也就是说,侦听器抛出的异常以及反序列化失败导致的任何异常。 我正在使用 BatchMessagingMessageConverter。我知道如果反序列化异常发生在 Kafka 反序列化器中,我可以使用 ErrorHandlingDeserializer;但是,我在 MessagingMessageConverter 中的配置发生了反序列化异常,我相信这是在 Kafka 客户端 BytesDeserializer 成功反序列化我的消息之后。 我怎样才能最好地实现我的目标?

这是我的容器工厂配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
         ConsumerFactory<Object, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
            new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
    );
    factory.setBatchErrorHandler(errorHandler);
    BatchMessagingMessageConverter messageConverter = new BatchMessagingMessageConverter(new BytesJsonMessageConverter());
    factory.setMessageConverter(messageConverter);
    factory.setConcurrency(1);
    return factory;
}

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "pojo-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(props);
}

这是我的听众:

@KafkaListener(id = "myKafkaListener", idIsGroup = false, autoStartup = "true", topics = {"pojo-topic"}, containerFactory = "kafkaListenerContainerFactory")
public void receive(List<Message<Pojo>> messages) {
    System.out.println("received " + messages.size() + " messages");
    int i = 0;
    try {
        //exceptions thrown here are handled as I intend
        for (var mm : messages) {
            var m = mm.getPayload();
            System.out.println("received: " + m + " at offset " + mm.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
            i++;
        }
    } catch (Exception e) {
        throw new BatchListenerFailedException("listener threw exception when processing batch", e, i);
    }
}

更新

这是当我发送字符串(只是“A”)而不是 json 对象时的堆栈跟踪,并且反序列化失败:

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:79) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2015) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1859) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1725) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1704) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1274) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1266) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2376) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2008) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1978) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1930) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1842) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 8 common frames omitted
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:122) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:174) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:322) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:153) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1988) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 11 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3643) ~[jackson-databind-2.12.4.jar:2.12.4]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:119) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 16 common frames omitted

这里有两种解决方法;第一个使用 ErrorHandlingDeserializerJsonDeserializer。第二种是解决方法,使用我打开的 ByteArrayJsonDeserializer an issue to provide a more seamless solution in the batch listener adapter.

示例 1,使用解串器:

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.listener.type=batch
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo == null
                    && headers.get(i).get(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {

                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT",
            properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +
                ":org.apache.kafka.common.serialization.StringDeserializer")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    BatchErrorHandler eh(ProducerFactory<String, byte[]> pf) {
        KafkaTemplate<String, byte[]> template = new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}");
            template.send("so69055510", "JUNK");
            template.send("so69055510", "{\"bar\":\"qux\"}");
        };
    }

}
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]

示例 2,使用自定义消息转换器。请注意,对于此解决方法,您需要某种方式在域对象中指示反序列化失败:

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo.getBar().equals("thisIsABadOne")) {
                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    ByteArrayJsonMessageConverter converter() {
        return new ByteArrayJsonMessageConverter() {

            @Override
            public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                    Consumer<?, ?> consumer, Type type) {

                try {
                    return super.toMessage(record, acknowledgment, consumer, Foo.class); // <<<<<< type
                }
                catch (ConversionException ex) {
                    return MessageBuilder.withPayload(new Foo("thisIsABadOne"))
                            .build();
                }
            }

        };
    }

    @Bean
    BatchErrorHandler eh(KafkaTemplate<String, byte[]> template) {
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, byte[]> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}".getBytes());
            template.send("so69055510", "JUNK".getBytes());
            template.send("so69055510", "{\"bar\":\"qux\"}".getBytes());
        };
    }

}
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]