Spring 集成:拆分器-聚合器流中错误处理的意外行为
Spring integration: unexpected behavior on error handling in splitter-aggregator flow
我有以下失败的测试。问题是,为什么在其中一个拆分的“子消息”出现错误的情况下回复只是错误,而另一个成功处理的子消息没有结果(如测试中预期的那样)?是否修改此代码以实现测试中的预期结果?
@RunWith(SpringRunner.class)
public class ErrorHandlingTests {
@Autowired
StringsService stringsService;
interface StringsService {
@Nonnull
List<String> process(@Nonnull List<String> data);
}
@EnableIntegration
@Configuration
static class Config {
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows.from("errorChannel")
.<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
.get();
}
@Bean
IntegrationFlow errorsHandlingFlow2() {
AtomicInteger incomingCorrelationId = new AtomicInteger();
return IntegrationFlows.from(StringsService.class, gws -> gws.errorChannel("errorChannel"))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> message) {
List<String> strings = (List<String>) message.getPayload();
int id = incomingCorrelationId.get();
return strings
.stream()
.map(r -> MessageBuilder
.withPayload(r)
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, id)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, strings.size())
.build())
.collect(Collectors.toList());
}
})
.transform(new AbstractPayloadTransformer<String, String>() {
@Override
protected String transformPayload(String s) {
if (s.contains("oops"))
throw new IllegalArgumentException("Bad value");
return "R: " + s;
}
})
.aggregate(a -> a.outputProcessor(new AbstractAggregatingMessageGroupProcessor() {
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
return group.getMessages()
.stream()
.map(m -> (String) m.getPayload())
.collect(Collectors.toList());
}
}))
.get();
}
}
@Test
public void testErrorHandlingInFlow2() {
assertEquals(Arrays.asList("R: a", "R: b"), stringsService.process(Arrays.asList("a", "b")));
assertEquals(Arrays.asList("R: a", "Failure for oops"), stringsService.process(Arrays.asList("a", "oops")));
}
}
更新版本,正在运行,已应用建议。
@RunWith(SpringRunner.class)
public class ErrorHandlingTests2 {
interface StringsService {
@Nonnull
List<String> process(@Nonnull List<String> data);
}
@Autowired
StringsService stringsService;
@EnableIntegration
@Configuration
static class Config {
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows.from("errorChannel")
.<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
.get();
}
@Bean
IntegrationFlow errorsHandlingFlow2() {
return IntegrationFlows.from(StringsService.class, gws -> gws.errorChannel("errorChannel"))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> message) {
List<String> strings = (List<String>) message.getPayload();
return strings
.stream()
.map(r -> MessageBuilder
.withPayload(r)
.build())
.collect(Collectors.toList());
}
})
.transform(new AbstractPayloadTransformer<String, String>() {
@Override
protected String transformPayload(String s) {
if (s.contains("oops"))
throw new IllegalArgumentException("Bad value");
return "R: " + s;
}
}, c -> c.advice(advice()))
.aggregate(a -> a.outputProcessor(new AbstractAggregatingMessageGroupProcessor() {
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
return group.getMessages()
.stream()
.map(m -> (String) m.getPayload())
.collect(Collectors.toList());
}
}))
.get();
}
@Bean
ExpressionEvaluatingRequestHandlerAdvice advice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setReturnFailureExpressionResult(true);
advice.setOnFailureExpression(
new FunctionExpression<Message<?>>(s ->
MessageBuilder
.withPayload("Failure for " + s.getPayload())
.copyHeaders(s.getHeaders()).build())
);
return advice;
}
}
@Test
public void testErrorHandlingInFlow2() {
assertEquals(Arrays.asList("R: a", "R: b"), stringsService.process(Arrays.asList("a", "b")));
assertEquals(Arrays.asList("R: a", "Failure for oops", "R: b"), stringsService.process(Arrays.asList("a", "oops", "b")));
}
}
- 分离器支持Java
Stream
.
- 拆分器默认填充那些 headers。不确定为什么需要自定义
IntegrationMessageHeaderAccessor.CORRELATION_ID
。在拆分器中是这样的:final Object correlationId = message.getHeaders().getId();
- 您在
transform()
之后已经有一个聚合器。因此,当 transform 抛出异常时,实际上是它没有到达聚合器。事实上,两者只是彼此不了解。这是 Spring 集成中首批 class 公民功能之一,其中端点是 loosely-couple,中间有消息通道。你可以从不同的地方向同一个端点发送消息。无论如何,我想即使使用普通的 Java 它也会以相同的方式运行:你有一个循环并在循环结束时将数据收集到一个列表中。现在成像你在收集逻辑之前的线上失败了,所以异常被抛给被调用的并且确实没有任何信息是如何收集的。
现在关于您预期逻辑的可能修复。请查看要应用于变压器的 ExpressionEvaluatingRequestHandlerAdvice
。因此,当您遇到异常时,它会在 failureChannel
sub-flow 中进行处理,并且会作为常规回复返回补偿消息,以便与其他消息汇总。有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain
我有以下失败的测试。问题是,为什么在其中一个拆分的“子消息”出现错误的情况下回复只是错误,而另一个成功处理的子消息没有结果(如测试中预期的那样)?是否修改此代码以实现测试中的预期结果?
@RunWith(SpringRunner.class)
public class ErrorHandlingTests {
@Autowired
StringsService stringsService;
interface StringsService {
@Nonnull
List<String> process(@Nonnull List<String> data);
}
@EnableIntegration
@Configuration
static class Config {
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows.from("errorChannel")
.<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
.get();
}
@Bean
IntegrationFlow errorsHandlingFlow2() {
AtomicInteger incomingCorrelationId = new AtomicInteger();
return IntegrationFlows.from(StringsService.class, gws -> gws.errorChannel("errorChannel"))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> message) {
List<String> strings = (List<String>) message.getPayload();
int id = incomingCorrelationId.get();
return strings
.stream()
.map(r -> MessageBuilder
.withPayload(r)
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, id)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, strings.size())
.build())
.collect(Collectors.toList());
}
})
.transform(new AbstractPayloadTransformer<String, String>() {
@Override
protected String transformPayload(String s) {
if (s.contains("oops"))
throw new IllegalArgumentException("Bad value");
return "R: " + s;
}
})
.aggregate(a -> a.outputProcessor(new AbstractAggregatingMessageGroupProcessor() {
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
return group.getMessages()
.stream()
.map(m -> (String) m.getPayload())
.collect(Collectors.toList());
}
}))
.get();
}
}
@Test
public void testErrorHandlingInFlow2() {
assertEquals(Arrays.asList("R: a", "R: b"), stringsService.process(Arrays.asList("a", "b")));
assertEquals(Arrays.asList("R: a", "Failure for oops"), stringsService.process(Arrays.asList("a", "oops")));
}
}
更新版本,正在运行,已应用建议。
@RunWith(SpringRunner.class)
public class ErrorHandlingTests2 {
interface StringsService {
@Nonnull
List<String> process(@Nonnull List<String> data);
}
@Autowired
StringsService stringsService;
@EnableIntegration
@Configuration
static class Config {
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows.from("errorChannel")
.<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
.get();
}
@Bean
IntegrationFlow errorsHandlingFlow2() {
return IntegrationFlows.from(StringsService.class, gws -> gws.errorChannel("errorChannel"))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> message) {
List<String> strings = (List<String>) message.getPayload();
return strings
.stream()
.map(r -> MessageBuilder
.withPayload(r)
.build())
.collect(Collectors.toList());
}
})
.transform(new AbstractPayloadTransformer<String, String>() {
@Override
protected String transformPayload(String s) {
if (s.contains("oops"))
throw new IllegalArgumentException("Bad value");
return "R: " + s;
}
}, c -> c.advice(advice()))
.aggregate(a -> a.outputProcessor(new AbstractAggregatingMessageGroupProcessor() {
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
return group.getMessages()
.stream()
.map(m -> (String) m.getPayload())
.collect(Collectors.toList());
}
}))
.get();
}
@Bean
ExpressionEvaluatingRequestHandlerAdvice advice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setReturnFailureExpressionResult(true);
advice.setOnFailureExpression(
new FunctionExpression<Message<?>>(s ->
MessageBuilder
.withPayload("Failure for " + s.getPayload())
.copyHeaders(s.getHeaders()).build())
);
return advice;
}
}
@Test
public void testErrorHandlingInFlow2() {
assertEquals(Arrays.asList("R: a", "R: b"), stringsService.process(Arrays.asList("a", "b")));
assertEquals(Arrays.asList("R: a", "Failure for oops", "R: b"), stringsService.process(Arrays.asList("a", "oops", "b")));
}
}
- 分离器支持Java
Stream
. - 拆分器默认填充那些 headers。不确定为什么需要自定义
IntegrationMessageHeaderAccessor.CORRELATION_ID
。在拆分器中是这样的:final Object correlationId = message.getHeaders().getId();
- 您在
transform()
之后已经有一个聚合器。因此,当 transform 抛出异常时,实际上是它没有到达聚合器。事实上,两者只是彼此不了解。这是 Spring 集成中首批 class 公民功能之一,其中端点是 loosely-couple,中间有消息通道。你可以从不同的地方向同一个端点发送消息。无论如何,我想即使使用普通的 Java 它也会以相同的方式运行:你有一个循环并在循环结束时将数据收集到一个列表中。现在成像你在收集逻辑之前的线上失败了,所以异常被抛给被调用的并且确实没有任何信息是如何收集的。
现在关于您预期逻辑的可能修复。请查看要应用于变压器的 ExpressionEvaluatingRequestHandlerAdvice
。因此,当您遇到异常时,它会在 failureChannel
sub-flow 中进行处理,并且会作为常规回复返回补偿消息,以便与其他消息汇总。有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain