Spring 集成 - 重播相同的流程

Spring Integration - replay the same flow

我有一个类似

的 SI DSL 流程
@Bean
public IntegrationFlow processRequest() {
    return flow -> flow.channel(REQUEST_INPUT)
            .transform(...)
            .enrichHeaders(h -> h.<Storage>headerFunction(...)))
            .enrich(this::...)

            //.route(ifReplayIsNeeded(), routeToSameFlow())

            .enrich(this::...)
            .route(..., ...)
            .route(..., ...)
            .enrich(this::...)
            .handle(...)

            //.route(ifReplayWasNeeded(), routeBack())

            .route(..., ...)
            .enrich(this::...)
            .transform(...)
            .channel(REQUEST_OUTPUT);
}

因此,当条件满足时(参见 ifReplayIsNeeded()),则必须再次调用 processRequest() 流程。然而,并非必须执行整个流程,但接近尾声时 (-> ifReplayWasNeeded()),此内部流程必须返回到调用它的位置并完全处理原始流程。

routeToSameFlow() 看起来像(Storage 用于存储 requests/responses 和流程中使用的其他数据)

Consumer<RouterSpec<Boolean, MethodInvokingRouter>> routeToSameFlow() {
    return rs -> rs.resolutionRequired(false)
            .subFlowMapping(true, sf -> sf
                    // 1. storing the current Storage
                    .enrichHeaders(h -> h.<Storage>headerFunction("store", s -> s))
                    // 2. creating the req for the internal flow
                    .transform(internalRequestMapper, "mapFrom")
                    // 3. routing to the beginning of the flow
                    .route(Message.class, (m) -> REQUEST_INPUT)
                    // 4. defining the channel where the internal flow will return to
                    .channel("RETURN_CHANNEL")
            )
            .defaultOutputToParentFlow();
}

routeBack()

Consumer<RouterSpec<Boolean, MethodInvokingRouter>> routeBack() {
    return rs -> rs.resolutionRequired(false)
            .subFlowMapping(true, sf ->
                    sf.route(Message.class, (m) -> "RETURN_CHANNEL")
            )
            .defaultOutputToParentFlow();
}

肯定是我遗漏了一些概念,因为我收到以下错误:

原因:org.springframework.beans.factory.BeanCreationException:'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@60a0f09f) 是一种单向 'MessageHandler',不合适配置 'outputChannel'。集成流程到此结束。

你能帮我实现这样的逻辑吗?我应该将主要流程拆分为更小的 IntegrationFlow 吗?
我想尽可能少地打乱主流,所以我只想在开头加一条导流路线,在末尾加一条return路线。可以吗?

谢谢!
问候,
五、

您可能将所有这些路由器的逻辑复杂化了。考虑在你的主要流程中间只有简单的 channel() 定义,并且作为你想要返回的那个子流程的输出。 Java DSL 中没有任何限制,只要在开始和结束时有 channel() 定义即可:您可以在流定义的任意点自由添加 channel()。例如:

 .handle(...)
 .channel("middleOfTheFlow")
 .route(ifReplayWasNeeded(), routeBack())

那么你的routeBack()流定义可以在最后有.channel("middleOfTheFlow"),在这里处理后的消息将被传递到主流的middleOfTheFlow通道。

换句话说,IntegrationFlow只是逻辑组件,用于将有关某些业务功能的端点保持在同一位置,但这并不意味着流程中的所有端点都非常严格地相互关联.您始终可以在两者之间建立一个通道,并从其他地方向该通道发送消息,订阅该通道的端点将处理该消息。

在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl