如何使用 Spring Cloud Stream 处理反序列化异常并转换为新模式?
How to Handle Deserialization Exception & Converting to New Schema with Spring Cloud Stream?
我无法理解如何正确处理 Spring 云流中的反序列化异常。主要是因为实施的框架不支持 headers 并且 DLQ 应该是与原始消息不同的模式。所以流程需要是:消费消息->反序列化错误->DlqHandler->用NEW schema序列化->发送到DLQ
下面链接的文档并没有给出这是否可能的好主意。我已经看到了很多 Spring-Kafka 的 SeekToCurrentErrorHandler 示例,但据我所知,这些是不同的实现,与我如何正确获取反序列化错误然后有一个自定义代码部分序列化为新格式不匹配然后从那里移动。
我的主要问题是:是否可以使用 spring 云流 (kafka) 捕获反序列化异常并重新序列化?
是,但不使用绑定重试或 DLQ 属性。
相反,添加一个 ListenerContainerCustomizer
bean 并使用为您需要的重试配置的 SeekToCurrentErrorHandler
自定义绑定的侦听器容器,并且可能使用适当的 DeadLetterPublishingRecoverer
的子类配置 KafkaTemplate
并可能覆盖 createProducerRecord
方法。
我无法理解如何正确处理 Spring 云流中的反序列化异常。主要是因为实施的框架不支持 headers 并且 DLQ 应该是与原始消息不同的模式。所以流程需要是:消费消息->反序列化错误->DlqHandler->用NEW schema序列化->发送到DLQ
下面链接的文档并没有给出这是否可能的好主意。我已经看到了很多 Spring-Kafka 的 SeekToCurrentErrorHandler 示例,但据我所知,这些是不同的实现,与我如何正确获取反序列化错误然后有一个自定义代码部分序列化为新格式不匹配然后从那里移动。
我的主要问题是:是否可以使用 spring 云流 (kafka) 捕获反序列化异常并重新序列化?
是,但不使用绑定重试或 DLQ 属性。
相反,添加一个 ListenerContainerCustomizer
bean 并使用为您需要的重试配置的 SeekToCurrentErrorHandler
自定义绑定的侦听器容器,并且可能使用适当的 DeadLetterPublishingRecoverer
的子类配置 KafkaTemplate
并可能覆盖 createProducerRecord
方法。