如何 Handle/Intercept 在 Spring 云流中抛出异常 (Elmhurst/2.0)
How to Handle/Intercept Exceptions Thrown in Spring Cloud Streams (Elmhurst/2.0)
我目前在处理 Spring Cloud Streams (Elmhurst.RELEASE) 消息处理期间抛出的异常时遇到问题。当我的应用在主处理方法中抛出异常时:
@SpringBootApplication
@EnableBinding(Processor.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String process(String message) {
externalService();
return message.toUpperCase();
}
private void externalService() {
throw new RuntimeException("An external call failed");
}
}
我似乎完全无法在任何错误通道上捕捉到这个异常。通过阅读文档,我预计我将能够在全局 "errorChannel" 或特定 "input.myGroup.errors" 频道上收到错误消息。
我尝试使用 Spring 集成侦听器:
@ServiceActivator(inputChannel="errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
以及 Spring 云流听众:
@StreamListener("errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
没有成功。我还玩过各种配置设置组合,如 "errorChannelEnabled: true"、"max-retries: 1" 等。对我能够捕获处理器抛出的错误的影响为零(在 [=48= 上使用调试点) ] 去检查)。按照 SCS 文档中的建议 "bindings.error.destination: myErrorTopic" 配置时,我什至没有在主题中收到任何错误消息。
唯一似乎有效的是活页夹配置级别的 "enableDlq: true"。然而,这并不能满足能够确定原始异常的重要需求,因为发布到 DLQ 的内容只有 header 和完整的堆栈跟踪。我不想解析堆栈跟踪来找出原始异常的类型或消息。
我应该在这里采用更好的方法吗?还是我犯了一些愚蠢的错误?我的总体目标是将具有实际异常类型的异常消息和异常消息发送到 DLQ。
当然我可以在每个 Processor/Source/Sink 方法周围放置 try/catch 语句,然后手动路由到不同的绑定通道,但这大大降低了 SCS 框架在我的想法。
我发现了 this example of custom DLQ handling as part of 较早的 Whosebug 问题,这似乎可以满足我的需求。然而,这似乎根本不适用于 SCS Elmhurst/2.0 和 Spring Kafka 活页夹,即使在迁移配置以适用于更新的 SCS 之后也是如此。
EDIT 我添加了一个 Github repository 重现我的错误,因为复制与 Gary 的答案相同的核心代码似乎不起作用。我开始怀疑这是否是 POM 依赖、配置或 Kafka 活页夹问题。这个 repo 使用与 Gary 的答案相同的核心代码,因为我认为查看问题和调试更简单一些。
在 Gary 的回答后进行编辑 我接受了 Gary 的回答,因为它解决了我原来的问题(当配置中有活页夹环境时解决当前框架问题)。然而,DLQ 消息最终对我的情况毫无帮助。根据 Gary 的回答,我最终订阅了 "errorChannel",并从那里创建了一条自定义错误消息,我将其发送到正常绑定的 SCS 频道。
为什么你有return message;
?你希望它去哪里?
这对我来说很好...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So50506586Application {
public static void main(String[] args) {
SpringApplication.run(So50506586Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(byte[] in) {
throw new RuntimeException("fail");
}
@ServiceActivator(inputChannel = "errorChannel")
public void errors(ErrorMessage em) {
System.out.println(em);
}
}
ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.So50506586Application#listen[1 args]; nested exception is java.lang.RuntimeException: fail, failedMessage=GenericMessage [payload=byte[3], headers={kafka_offset=0, ...
...
Caused by: java.lang.RuntimeException: fail
...
编辑
这是使用 multi-binder 支持(通过 environment
的东西)时的错误。全局错误通道不可见。
这有效...
spring:
cloud:
stream:
bindings:
input:
destination: input
group: myGroup7
binder: kafka
# error:
# destination: errors
kafka.bindings.input.consumer.autoCommitOnError: true
kafka:
binder:
brokers: localhost:9092
(您不再需要带有 Elmhurst 的 zkNodes)。
此外,您不应该使用遗留的 error.destination
内容,因为发送到错误通道的消息现在是包含大量信息的 ErrorMessage
object,请使用 dlq自动发布到主题的内容。
我们需要删除那些遗留错误目标属性。
这也不行...
@ServiceActivator(inputChannel = "input.myGroup.errors")
public void errors(ErrorMessage em) {
System.out.println(em);
}
...因为服务激活器与绑定处于不同的应用程序上下文中。
目前,唯一的 work-around 是不使用 environment
风格的配置。如果您需要多个活页夹支持,我没有适合您的解决方案。
顺便说一句,使用 Elmhurst(和 kafka 0.11 或更高版本),堆栈跟踪不再嵌入到发送到 DLQ 的消息的有效负载中,它作为 kafka header.
发送
我目前在处理 Spring Cloud Streams (Elmhurst.RELEASE) 消息处理期间抛出的异常时遇到问题。当我的应用在主处理方法中抛出异常时:
@SpringBootApplication
@EnableBinding(Processor.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String process(String message) {
externalService();
return message.toUpperCase();
}
private void externalService() {
throw new RuntimeException("An external call failed");
}
}
我似乎完全无法在任何错误通道上捕捉到这个异常。通过阅读文档,我预计我将能够在全局 "errorChannel" 或特定 "input.myGroup.errors" 频道上收到错误消息。
我尝试使用 Spring 集成侦听器:
@ServiceActivator(inputChannel="errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
以及 Spring 云流听众:
@StreamListener("errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
没有成功。我还玩过各种配置设置组合,如 "errorChannelEnabled: true"、"max-retries: 1" 等。对我能够捕获处理器抛出的错误的影响为零(在 [=48= 上使用调试点) ] 去检查)。按照 SCS 文档中的建议 "bindings.error.destination: myErrorTopic" 配置时,我什至没有在主题中收到任何错误消息。
唯一似乎有效的是活页夹配置级别的 "enableDlq: true"。然而,这并不能满足能够确定原始异常的重要需求,因为发布到 DLQ 的内容只有 header 和完整的堆栈跟踪。我不想解析堆栈跟踪来找出原始异常的类型或消息。
我应该在这里采用更好的方法吗?还是我犯了一些愚蠢的错误?我的总体目标是将具有实际异常类型的异常消息和异常消息发送到 DLQ。
当然我可以在每个 Processor/Source/Sink 方法周围放置 try/catch 语句,然后手动路由到不同的绑定通道,但这大大降低了 SCS 框架在我的想法。
我发现了 this example of custom DLQ handling as part of
EDIT 我添加了一个 Github repository 重现我的错误,因为复制与 Gary 的答案相同的核心代码似乎不起作用。我开始怀疑这是否是 POM 依赖、配置或 Kafka 活页夹问题。这个 repo 使用与 Gary 的答案相同的核心代码,因为我认为查看问题和调试更简单一些。
在 Gary 的回答后进行编辑 我接受了 Gary 的回答,因为它解决了我原来的问题(当配置中有活页夹环境时解决当前框架问题)。然而,DLQ 消息最终对我的情况毫无帮助。根据 Gary 的回答,我最终订阅了 "errorChannel",并从那里创建了一条自定义错误消息,我将其发送到正常绑定的 SCS 频道。
为什么你有return message;
?你希望它去哪里?
这对我来说很好...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So50506586Application {
public static void main(String[] args) {
SpringApplication.run(So50506586Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(byte[] in) {
throw new RuntimeException("fail");
}
@ServiceActivator(inputChannel = "errorChannel")
public void errors(ErrorMessage em) {
System.out.println(em);
}
}
ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.So50506586Application#listen[1 args]; nested exception is java.lang.RuntimeException: fail, failedMessage=GenericMessage [payload=byte[3], headers={kafka_offset=0, ...
...
Caused by: java.lang.RuntimeException: fail
...
编辑
这是使用 multi-binder 支持(通过 environment
的东西)时的错误。全局错误通道不可见。
这有效...
spring:
cloud:
stream:
bindings:
input:
destination: input
group: myGroup7
binder: kafka
# error:
# destination: errors
kafka.bindings.input.consumer.autoCommitOnError: true
kafka:
binder:
brokers: localhost:9092
(您不再需要带有 Elmhurst 的 zkNodes)。
此外,您不应该使用遗留的 error.destination
内容,因为发送到错误通道的消息现在是包含大量信息的 ErrorMessage
object,请使用 dlq自动发布到主题的内容。
我们需要删除那些遗留错误目标属性。
这也不行...
@ServiceActivator(inputChannel = "input.myGroup.errors")
public void errors(ErrorMessage em) {
System.out.println(em);
}
...因为服务激活器与绑定处于不同的应用程序上下文中。
目前,唯一的 work-around 是不使用 environment
风格的配置。如果您需要多个活页夹支持,我没有适合您的解决方案。
顺便说一句,使用 Elmhurst(和 kafka 0.11 或更高版本),堆栈跟踪不再嵌入到发送到 DLQ 的消息的有效负载中,它作为 kafka header.
发送