将无法反序列化的消息发布到 DLT 主题

Publish messages that could not be de-serialized to DLT topic

我不明白如何使用 spring kafka 将无法反序列化的消息写入 DLT 主题。

我根据 spring kafka docs 配置了消费者,这适用于消息反序列化后发生的异常。

但是当消息不可反序列化时,在轮询消息时抛出 org.apache.kafka.common.errors.SerializationException

随后,SeekToCurrentErrorHandler.handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, ...) 被调用并出现此异常,但记录列表为空,因此无法向 DLT 主题写入内容。

如何将这些消息也写入 DLT 主题?

问题是异常是由 Kafka 客户端本身抛出的,所以 Spring 看不到失败的实际记录。

这就是我们添加 ErrorHandlingDeserializer2 的原因,它可用于包装实际的反序列化器;失败被传递给侦听器容器并作为 DeserializationException.

重新抛出

the documentation

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer2. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

DeadLetterPublishingRecoverer 具有检测异常并发布失败记录的逻辑。