如何从 Spring Cloud Stream 中的 Spring 集成错误通道捕获错误?
How to catch errors from the Spring Integration error channel inside Spring Cloud Stream?
我正在尝试为使用 Kafka 作为消息代理的 SCS 应用程序内部处理期间的故障创建一个应用程序级错误处理程序。我知道 SCS 已经提供了 DLQ 功能,但在我的例子中,我想用自定义包装器类型包装失败的消息(提供失败上下文(来源、原因等))
在https://github.com/qabbasi/Spring-Cloud-Stream-DLQ-Error-Handling中,您可以看到针对此场景的两种方法:一种是使用SCS,另一种是直接Spring集成。 (两个都是 atm 不工作)
根据当前参考 (https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_producing_and_consuming_messages),SCS 将允许发布从 Spring 集成错误通道收到的错误消息,但不幸的是,情况并非如此,至少对我而言。尽管应用程序在启动时记录了以下内容
o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 2 subscriber(s).
你不应该使用 @StreamListener("errorChannel")
- 它是从活页夹目的地消费的;要捕获发送到 errorChannel
的消息,请使用 @ServiceActivator(inputChannel = "errorChannel")
.
编辑
您的应用存在多个问题...
- 在 1.3 版本中添加了新的错误处理代码
autoCommitOnError
是一个 kafka 活页夹 属性
- 你需要一个
@EnableBinding(CustomDlqMessageChannel.class)
- 你真的不需要
@EnableIntegration
- boot 会为你做这些
和...
$ kafka-console-producer --broker-list localhost:9092 --topic testIn
>foo
和...
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic customDlqTopic --from-beginning
?
contentType>"application/x-java-object;type=com.example.demo.ErrorWrapper"
我正在尝试为使用 Kafka 作为消息代理的 SCS 应用程序内部处理期间的故障创建一个应用程序级错误处理程序。我知道 SCS 已经提供了 DLQ 功能,但在我的例子中,我想用自定义包装器类型包装失败的消息(提供失败上下文(来源、原因等))
在https://github.com/qabbasi/Spring-Cloud-Stream-DLQ-Error-Handling中,您可以看到针对此场景的两种方法:一种是使用SCS,另一种是直接Spring集成。 (两个都是 atm 不工作)
根据当前参考 (https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_producing_and_consuming_messages),SCS 将允许发布从 Spring 集成错误通道收到的错误消息,但不幸的是,情况并非如此,至少对我而言。尽管应用程序在启动时记录了以下内容
o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 2 subscriber(s).
你不应该使用 @StreamListener("errorChannel")
- 它是从活页夹目的地消费的;要捕获发送到 errorChannel
的消息,请使用 @ServiceActivator(inputChannel = "errorChannel")
.
编辑
您的应用存在多个问题...
- 在 1.3 版本中添加了新的错误处理代码
autoCommitOnError
是一个 kafka 活页夹 属性- 你需要一个
@EnableBinding(CustomDlqMessageChannel.class)
- 你真的不需要
@EnableIntegration
- boot 会为你做这些
和...
$ kafka-console-producer --broker-list localhost:9092 --topic testIn
>foo
和...
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic customDlqTopic --from-beginning
?
contentType>"application/x-java-object;type=com.example.demo.ErrorWrapper"