Spring 拆分和聚合流中的集成验证
Spring integration validation in split & aggregate flow
我正在尝试添加过滤器以丢弃流程并在失败后继续执行主流并聚合拆分器。错误和成功的预期类型相同。没有特定的聚合器逻辑。
@Bean
public IntegrationFlow flow() {
return f -> f
.split(Orders.class, Orders::getItems)
.enrich(e -> e.requestChannel("enrichChannel"))
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardChannel(validationError()))
.handle(new MyHandler())
.transform(new MapToObjectTransformer(Order.class))
.enrich(e -> e.requestChannel("transformChannel"))
.filter(Order.class, c -> c.getTotal() > 100 ? true : false,
e -> e.discardChannel(validationError())).handle( new transformer())
.aggregate();
}
@Bean
public IntegrationFlow validationErrorFlow() {
return IntegrationFlows.from(validationError())
.handle(new ValidationHandler())
.get();
}
丢弃通道没有连接回主流以执行拆分中的下一个项目。
我可以编写路由和子流映射,但是这会在路由 -> 子流 -> 路由 -> 子流中变得过于嵌套,试图通过使用过滤器来解决这个问题。有没有更好的方法来执行验证并继续拆分流程中的所有项目。
更新 1:
.handle(request.class, (p, h) -> validator.validate(p)
.gateway("filterFlow.input")
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.aggregate();
@Bean
public IntegrationFlow filterFlow() {
return f -> f
.filter(response.class, c -> c.isValidationStatus(), df -> df.discardFlow
(flow -> flow.handle(Message.class, (p, h) -> p.getPayload())));
}
网关能够拦截请求,但执行的流程 .handle(new MyHandler())
而不是 split()
中的下一项
更新 2:(答案)来自 Artem
.handle(request.class, (p, h) -> validator.validate(p))
.filter(response.class,p -> p.isValidationStatus(), f -> f.discardChannel("aggregatorChannel"))
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.channel("aggregatorChannel")
.aggregate();
这将有条件地跳过并继续流程。
the discard channel is not joining back to the main flow to execute the next item in the split.
没错。这就是它的设计方式。在大多数情况下,丢弃流类似于 JMS 中的 Dead Letter Queue。所以,这是短 one-way 分支。
如果你真的想回到主流程,你应该考虑在流程定义中使用命名通道。我的意思是在补偿(丢弃)流之后你想回来的点:
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardFlow(sf -> sf
.gateway(validationError())
.channel("myHandleChannel")))
.channel("myHandleChannel")
.handle(new MyHandler())
我使用 gateway()
因为我们需要丢弃流的回复才能继续处理。我们需要 sub-flow 末尾的 .channel("myHandleChannel")
因为丢弃流是一个分支。
另一种方法可以通过主流程上的 .gateway()
实现:
.gateway("filterFlow.input")
.handle(new MyHandler())
...
@Bean
public IntegrationFlow filterFlow() {
return f -> f
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardChannel(validationError()));
}
我们向 discardChannel
发送相同的请求消息,因此上述网关的正确 replyChannel
header 仍然存在。您只需要确保从 .handle(new ValidationHandler())
.
生成正确的回复
我正在尝试添加过滤器以丢弃流程并在失败后继续执行主流并聚合拆分器。错误和成功的预期类型相同。没有特定的聚合器逻辑。
@Bean
public IntegrationFlow flow() {
return f -> f
.split(Orders.class, Orders::getItems)
.enrich(e -> e.requestChannel("enrichChannel"))
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardChannel(validationError()))
.handle(new MyHandler())
.transform(new MapToObjectTransformer(Order.class))
.enrich(e -> e.requestChannel("transformChannel"))
.filter(Order.class, c -> c.getTotal() > 100 ? true : false,
e -> e.discardChannel(validationError())).handle( new transformer())
.aggregate();
}
@Bean
public IntegrationFlow validationErrorFlow() {
return IntegrationFlows.from(validationError())
.handle(new ValidationHandler())
.get();
}
丢弃通道没有连接回主流以执行拆分中的下一个项目。
我可以编写路由和子流映射,但是这会在路由 -> 子流 -> 路由 -> 子流中变得过于嵌套,试图通过使用过滤器来解决这个问题。有没有更好的方法来执行验证并继续拆分流程中的所有项目。
更新 1:
.handle(request.class, (p, h) -> validator.validate(p)
.gateway("filterFlow.input")
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.aggregate();
@Bean
public IntegrationFlow filterFlow() {
return f -> f
.filter(response.class, c -> c.isValidationStatus(), df -> df.discardFlow
(flow -> flow.handle(Message.class, (p, h) -> p.getPayload())));
}
网关能够拦截请求,但执行的流程 .handle(new MyHandler())
而不是 split()
更新 2:(答案)来自 Artem
.handle(request.class, (p, h) -> validator.validate(p))
.filter(response.class,p -> p.isValidationStatus(), f -> f.discardChannel("aggregatorChannel"))
.handle(new MyHandler())
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.enrich(...)
.handle(...)
.channel("aggregatorChannel")
.aggregate();
这将有条件地跳过并继续流程。
the discard channel is not joining back to the main flow to execute the next item in the split.
没错。这就是它的设计方式。在大多数情况下,丢弃流类似于 JMS 中的 Dead Letter Queue。所以,这是短 one-way 分支。
如果你真的想回到主流程,你应该考虑在流程定义中使用命名通道。我的意思是在补偿(丢弃)流之后你想回来的点:
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardFlow(sf -> sf
.gateway(validationError())
.channel("myHandleChannel")))
.channel("myHandleChannel")
.handle(new MyHandler())
我使用 gateway()
因为我们需要丢弃流的回复才能继续处理。我们需要 sub-flow 末尾的 .channel("myHandleChannel")
因为丢弃流是一个分支。
另一种方法可以通过主流程上的 .gateway()
实现:
.gateway("filterFlow.input")
.handle(new MyHandler())
...
@Bean
public IntegrationFlow filterFlow() {
return f -> f
.filter(Order.class, c -> c.getId() > 10 ? true : false,
e -> e.discardChannel(validationError()));
}
我们向 discardChannel
发送相同的请求消息,因此上述网关的正确 replyChannel
header 仍然存在。您只需要确保从 .handle(new ValidationHandler())
.