PayloadTypeRouter 直接发送到 Transformer 之间没有通道?

PayloadTypeRouter directly sending into a Transformer without channel in between?

我需要使用 PayloadTypeRouter 并希望将路由消息直接发送到 TransformerFilterServiceActivator。所有都应该使用 Kotlin DSL(或 Java DSL)进行配置。

目前,一段漂亮的代码如下所示:

    @Bean
    fun routeAzureC2DMessage() = integrationFlow {
        channel(IoTHubChannelNames.IOTHUB_TO_DEVICE_CHANNEL)
        route<IotHubC2DRequestMessage<IotHubC2DRequest>> {
            when (it.payload) {
                is IotHubDesiredPropertyUpdate -> IoTHubChannelNames.DESIRED_PROPERTY_UPDATE
                is IotHubMessageToDevice -> IoTHubChannelNames.MESSAGE_TO_DEVICE
            }
        }
    }

然后继续(路由的一侧)

    @Bean
    fun processMessageToDevice() = integrationFlow {
        channel(IoTHubChannelNames.MESSAGE_TO_DEVICE)
        filter(StructureFilter())
        transform(MessageTransformer())
        channel(SharedChannelNames.CLOUD2DEVICE)
    }

我想删除不需要的频道 IoTHubChannelNames.MESSAGE_TO_DEVICE。我尝试了几种方法,在项目的另一部分我想出了这样的东西 (Java DSL)

IntegrationFlows
            .from(channelName)
            .route({ message: IotHubMessage -> message.javaClass }) { router: RouterSpec<Class<*>?, MethodInvokingRouter?> ->
                router
                    .subFlowMapping(DeviceToCloudMessage::class.java) {
                        it.handle(gateway, "handleD2CMessage")
                    }
                    .subFlowMapping(DeviceTwinUpdateMessage::class.java) {
                        it.handle(gateway, "handleDeviceTwinReportedProperty")
                    }
            }
            .get()

subFlowMapping是摆脱中间通道的唯一方法吗?我想要一个解决方案,我仍然可以使用 when (it.payload) 然后可以 return 新的 integrationFlow 或其他形式的流定义来代替 channel/channel 名称。

目前唯一的解决办法是 API:

inline fun <reified P, T> route(
        crossinline function: (P) -> T,
        crossinline configurer: KotlinRouterSpec<T, MethodInvokingRouter>.() -> Unit) {

你所问的 when(...) is 语法目前不受支持。

随时就此事提出 GH 问题,并尽可能详细地分享从 Kotlin 的角度来看它是什么以及如何在 Spring 集成 DSL 中使用它。

更新

另一方面,当前的 Kotlin 支持还不错:

            route<Int, Boolean>({ it % 2 == 0 }) {
                subFlowMapping(true) { handle<Int> { p, _ -> p * 2 } }
                subFlowMapping(false) { handle<Int> { p, _ -> p * 3 } }
            }

因此,route() 方法的参数是 when()subFlowMapping()is-> 输出为 integrationFlow生成器结果。因此,我们可能不会追求 Kotlin when(),除非我们失去 subFlowMapping 以支持 -> 运算符...

,否则不会给我们带来太多收益

更新 2

经过更多思考并寻找可能的解决方案后,我不得不撤回对 when() 类功能请求的请求。

IntegrationFlow 的主要问题及其所有配置必须在使用前预先在应用程序上下文中注册。您在原始路由器功能中使用 when() 询问的内容不是框架可以为您检测和处理的内容。此函数不属于框架的一部分,不对产生的结果负责。

好吧,我们可以检查 IntegrationFlow return 来决定如何调用,但不能保证您将从该函数转到 return 的流程是一个注册的bean。当我们真正注册它并尝试在这样的函数中使用它时,这与我们到目前为止从该函数 returning 通道以及它们在某处的某个流 bean 中的映射没有什么区别。

我们可以在一些为此指令设计的指令中自动将 IntegrationFlow 注册为 bean,例如 subFlowMapping()。无论如何,它只在配置阶段完成一次。但是,当最终用户代码 returns 在运行时流动时,这样做并不是很好。最好是 return 通道或一些我们有现有流映射的其他键。

我个人更喜欢不要忽略 MessageChannel 抽象并在我需要在不同流之间分配逻辑时使用它。当单个流中的代码是线性的并且代表单个逻辑工作单元时,它看起来更清晰。不过,其他流程可能会在其他逻辑中重用。我只需要从其他地方指向那些流量输入通道!

尽管如此,我的主要观点是:我们必须先注册并IntegrationFlow,然后才能向它发送消息。