Spring 拆分和聚合流中的集成验证

Spring integration validation in split & aggregate flow


public IntegrationFlow flow() {
     return f -> f
         .split(Orders.class, Orders::getItems)
         .enrich(e -> e.requestChannel("enrichChannel"))
         .filter(Order.class, c -> c.getId() > 10 ? true : false,
             e -> e.discardChannel(validationError()))
         .handle(new MyHandler())
         .transform(new MapToObjectTransformer(Order.class))
         .enrich(e -> e.requestChannel("transformChannel"))
         .filter(Order.class, c -> c.getTotal() > 100 ? true : false,
             e -> e.discardChannel(validationError())).handle( new transformer())
public IntegrationFlow validationErrorFlow() {
 return IntegrationFlows.from(validationError())
         .handle(new ValidationHandler())


我可以编写路由和子流映射,但是这会在路由 -> 子流 -> 路由 -> 子流中变得过于嵌套,试图通过使用过滤器来解决这个问题。有没有更好的方法来执行验证并继续拆分流程中的所有项目。

更新 1:

.handle(request.class, (p, h) -> validator.validate(p)
.handle(new MyHandler())

    public IntegrationFlow filterFlow() {
        return f -> f
                .filter(response.class, c -> c.isValidationStatus(), df -> df.discardFlow
                        (flow -> flow.handle(Message.class, (p, h) -> p.getPayload())));

网关能够拦截请求,但执行的流程 .handle(new MyHandler()) 而不是 split()


更新 2:(答案)来自 Artem

.handle(request.class, (p, h) -> validator.validate(p))
    .filter(response.class,p -> p.isValidationStatus(), f -> f.discardChannel("aggregatorChannel"))
    .handle(new MyHandler())


the discard channel is not joining back to the main flow to execute the next item in the split.

没错。这就是它的设计方式。在大多数情况下,丢弃流类似于 JMS 中的 Dead Letter Queue。所以,这是短 one-way 分支。


.filter(Order.class, c -> c.getId() > 10 ? true : false,
                    e -> e.discardFlow(sf -> sf
            .handle(new MyHandler())

我使用 gateway() 因为我们需要丢弃流的回复才能继续处理。我们需要 sub-flow 末尾的 .channel("myHandleChannel") 因为丢弃流是一个分支。

另一种方法可以通过主流程上的 .gateway() 实现:

.handle(new MyHandler())


public IntegrationFlow filterFlow() {
    return f -> f
            .filter(Order.class, c -> c.getId() > 10 ? true : false,
                    e -> e.discardChannel(validationError()));

我们向 discardChannel 发送相同的请求消息,因此上述网关的正确 replyChannel header 仍然存在。您只需要确保从 .handle(new ValidationHandler()).
