将无法反序列化的消息发布到 DLT 主题
Publish messages that could not be de-serialized to DLT topic
我不明白如何使用 spring kafka 将无法反序列化的消息写入 DLT 主题。
我根据 spring kafka docs 配置了消费者,这适用于消息反序列化后发生的异常。
但是当消息不可反序列化时,在轮询消息时抛出 org.apache.kafka.common.errors.SerializationException
。
随后,SeekToCurrentErrorHandler.handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, ...)
被调用并出现此异常,但记录列表为空,因此无法向 DLT 主题写入内容。
如何将这些消息也写入 DLT 主题?
问题是异常是由 Kafka 客户端本身抛出的,所以 Spring 看不到失败的实际记录。
这就是我们添加 ErrorHandlingDeserializer2
的原因,它可用于包装实际的反序列化器;失败被传递给侦听器容器并作为 DeserializationException
.
重新抛出
When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer2. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.
DeadLetterPublishingRecoverer
具有检测异常并发布失败记录的逻辑。
我不明白如何使用 spring kafka 将无法反序列化的消息写入 DLT 主题。
我根据 spring kafka docs 配置了消费者,这适用于消息反序列化后发生的异常。
但是当消息不可反序列化时,在轮询消息时抛出 org.apache.kafka.common.errors.SerializationException
。
随后,SeekToCurrentErrorHandler.handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, ...)
被调用并出现此异常,但记录列表为空,因此无法向 DLT 主题写入内容。
如何将这些消息也写入 DLT 主题?
问题是异常是由 Kafka 客户端本身抛出的,所以 Spring 看不到失败的实际记录。
这就是我们添加 ErrorHandlingDeserializer2
的原因,它可用于包装实际的反序列化器;失败被传递给侦听器容器并作为 DeserializationException
.
When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer2. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.
DeadLetterPublishingRecoverer
具有检测异常并发布失败记录的逻辑。