DeadLetterPublishingRecoverer 未在 .DLT 主题上发布 "original payload"

DeadLetterPublishingRecoverer not publishing the "original payload" on the .DLT topic

我正在使用 spring-kafka-2.2.9.RELEASEkafka_2.12-2.3.0。 我一直在尝试获取 .DLT 主题中的 original payload,但我得到的只是 "null"。我确信这可以使用 ErrorHandlingDeserializer2SeekToCurrentErrorHandlerDeadLetterPublishingRecoverer 来完成,但我不确定我缺少什么。

生产者与消费者

Producer:
    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public ProducerFactory<Object, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put("acks","all");

        return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<>());
        //return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<Object>(objectMapper));
    }

    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate() {
        return new KafkaTemplate<Object, Object>(producerFactory());
    }

Consumer:
@Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
        jsonDeserializer.addTrustedPackages("*");
        ErrorHandlingDeserializer2<Object> errorHandlingDeserializerKey = new ErrorHandlingDeserializer2<>(jsonDeserializer);
        ErrorHandlingDeserializer2<Object> errorHandlingDeserializerValue = new ErrorHandlingDeserializer2<>(jsonDeserializer);     

        return new DefaultKafkaConsumerFactory<>(props, errorHandlingDeserializerKey, errorHandlingDeserializerValue);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), 3));

        return factory;
    }

我在 topic.DLT 中看到 "null":

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic.DLT --from-beginning
null
null

在此先感谢您的帮助。

ErrorHandlingDeserializer2中有这样的逻辑:

private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) {
    if (this.failedDeserializationFunction != null) {
        FailedDeserializationInfo failedDeserializationInfo =
                new FailedDeserializationInfo(topic, headers, data, this.isForKey, exception);
        return this.failedDeserializationFunction.apply(failedDeserializationInfo);
    }
    else {
        return null;
    }
}

因此,如果未提供 failedDeserializationFunction,反序列化数据将返回为 null

有关详细信息,请参阅文档:https://docs.spring.io/spring-kafka/docs/2.3.1.RELEASE/reference/html/#error-handling-deserializer

If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header that contains the cause and the raw bytes.

您目前无法使用 2.@KafkaListener 中的 DLT 记录(由于序列化异常而发布)。2.x 因为检测到 DeserializationException header由容器将 DLT 记录本身发送到错误处理程序。

我刚刚 fixed this,它将在 2.2.11 中可用。

但是,如果您使用正则 KafkaConsumer,您可以使用此代码获取原始值...

Header exHeader = record.headers().lastHeader(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER);
DeserializationException ex = (DeserializationException) new ObjectInputStream(
        new ByteArrayInputStream(exHeader.value())).readObject();
System.out.println("DLT: " + new String(ex.getData()));

注意:如果您使用 Artem 建议的函数,记录将转到您的主侦听器,而不是错误处理程序,因此您必须在那里处理它。因此,您将需要某种包含原始有效负载的虚拟值。

如果能升级到2.3.1肯定更简单