spring 集成中的错误处理 - 如何获取多个线程中抛出的所有错误并将它们发送到错误通道
Error handling in spring integration - How to get all the errors thrown in multiple threads and send them to the error-channel
我有两个线程,线程中的每个方法都抛出异常。我如何获得每个线程中抛出的所有错误?在此代码中,错误通道仅捕获其中一个错误。基本上我的目标是捕获所有错误并将它们发送给调用者(休息控制器)。任何帮助将不胜感激。谢谢
Integration.java
public IntegrationFlow provisionUserFlow() {
return IntegrationFlows.from("input.channel")
.publishSubscribeChannel(Executors.newCachedThreadPool(),
s -> s
.subscribe(f -> f.handle(provisionerA, "provision"))
.subscribe(f -> f.handle(provisionerB, "provision"))
.get();
}
@ServiceActivator( inputChannel = "errorChannel", outputChannel = "replyChannel")
public boolean processErrors(Exception message) throws RuntimeException{
System.out.println("Message" + message.getMessage());
System.out.println ("******************************");
throw new RuntimeException(message.getMessage());
}
MGateway.java
@MessagingGateway(errorChannel = "errorChannel")
public interface MGateway {
@Gateway(requestChannel = "input.channel", replyChannel = "replyChannel")
boolean invokeProvisioner(User user);
}
解决方案
@Bean
public IntegrationFlow provisionUserFlow() {
return
IntegrationFlows.from("input.channel")
.publishSubscribeChannel(Executors.newCachedThreadPool(),
s -> s.applySequence(true)
.subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.handle(provisionerA, "provision")
.channel("aggregatorChannel")
)
.subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.handle(provisionerB, "provision")
.channel("aggregatorChannel"))
)
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregatorChannel")
.channel( aggregatorChannel)
.aggregate( a -> a.processor( collect, "aggregatingMethod"))
.get();
}
@Transformer( inputChannel = "errorChannel", outputChannel = "aggregatorChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
.getFailedMessage();
Exception exception = (Exception) errorMessage.getPayload();
return MessageBuilder.withPayload( exception.getMessage())
.copyHeadersIfAbsent( failedMessage.getHeaders() )
.build();
}
你看@Gateway
只是Java方法。它有一个 return 并且可能会抛出一个异常。我仍然很困惑为什么人们认为 Spring 集成的工作方式有些不同。它完全基于 Java 并且没有任何魔法 - 仅调用 java 方法。
现在让我们想象一下,如果您只使用原始 Java 进行开发,您会做什么。是的,您将等待来自两个线程的依赖并为调用者构建一个 return。
我们可以对 Spring 集成做同样的事情。只需要使用 Aggregator
EIP。您可以在该错误通道中捕获错误消息并通过它们的 failedMessage
将它们关联起来。 .publishSubscribeChannel()
可以与选项一起提供:
/**
* Specify whether to apply the sequence number and size headers to the
* messages prior to invoking the subscribed handlers. By default, this
* value is <code>false</code> meaning that sequence headers will
* <em>not</em> be applied. If planning to use an Aggregator downstream
* with the default correlation and completion strategies, you should set
* this flag to <code>true</code>.
* @param applySequence true if the sequence information should be applied.
*/
public void setApplySequence(boolean applySequence) {
默认为 false
。然后聚合器可以只依赖默认 correlationStrategy
并为您收集错误组 return 到 headers.
中的 replyChannel
您可以在参考手册中找到的所有信息:
我有两个线程,线程中的每个方法都抛出异常。我如何获得每个线程中抛出的所有错误?在此代码中,错误通道仅捕获其中一个错误。基本上我的目标是捕获所有错误并将它们发送给调用者(休息控制器)。任何帮助将不胜感激。谢谢
Integration.java
public IntegrationFlow provisionUserFlow() {
return IntegrationFlows.from("input.channel")
.publishSubscribeChannel(Executors.newCachedThreadPool(),
s -> s
.subscribe(f -> f.handle(provisionerA, "provision"))
.subscribe(f -> f.handle(provisionerB, "provision"))
.get();
}
@ServiceActivator( inputChannel = "errorChannel", outputChannel = "replyChannel")
public boolean processErrors(Exception message) throws RuntimeException{
System.out.println("Message" + message.getMessage());
System.out.println ("******************************");
throw new RuntimeException(message.getMessage());
}
MGateway.java
@MessagingGateway(errorChannel = "errorChannel")
public interface MGateway {
@Gateway(requestChannel = "input.channel", replyChannel = "replyChannel")
boolean invokeProvisioner(User user);
}
解决方案
@Bean
public IntegrationFlow provisionUserFlow() {
return
IntegrationFlows.from("input.channel")
.publishSubscribeChannel(Executors.newCachedThreadPool(),
s -> s.applySequence(true)
.subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.handle(provisionerA, "provision")
.channel("aggregatorChannel")
)
.subscribe(f -> f.enrichHeaders(e -> e.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.handle(provisionerB, "provision")
.channel("aggregatorChannel"))
)
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregatorChannel")
.channel( aggregatorChannel)
.aggregate( a -> a.processor( collect, "aggregatingMethod"))
.get();
}
@Transformer( inputChannel = "errorChannel", outputChannel = "aggregatorChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
.getFailedMessage();
Exception exception = (Exception) errorMessage.getPayload();
return MessageBuilder.withPayload( exception.getMessage())
.copyHeadersIfAbsent( failedMessage.getHeaders() )
.build();
}
你看@Gateway
只是Java方法。它有一个 return 并且可能会抛出一个异常。我仍然很困惑为什么人们认为 Spring 集成的工作方式有些不同。它完全基于 Java 并且没有任何魔法 - 仅调用 java 方法。
现在让我们想象一下,如果您只使用原始 Java 进行开发,您会做什么。是的,您将等待来自两个线程的依赖并为调用者构建一个 return。
我们可以对 Spring 集成做同样的事情。只需要使用 Aggregator
EIP。您可以在该错误通道中捕获错误消息并通过它们的 failedMessage
将它们关联起来。 .publishSubscribeChannel()
可以与选项一起提供:
/**
* Specify whether to apply the sequence number and size headers to the
* messages prior to invoking the subscribed handlers. By default, this
* value is <code>false</code> meaning that sequence headers will
* <em>not</em> be applied. If planning to use an Aggregator downstream
* with the default correlation and completion strategies, you should set
* this flag to <code>true</code>.
* @param applySequence true if the sequence information should be applied.
*/
public void setApplySequence(boolean applySequence) {
默认为 false
。然后聚合器可以只依赖默认 correlationStrategy
并为您收集错误组 return 到 headers.
replyChannel
您可以在参考手册中找到的所有信息: