如何查看在 Spring Integration 的 IntegrationFlow 中流动的类型

How to see the types that flows in Spring Integration's IntegrationFlow

当我在 Spring 集成中聚合时,我试图了解 returns 的类型是什么,这非常困难。我正在使用 Project Reactor,我的代码片段是:

public FluxAggregatorMessageHandler randomIdsBatchAggregator() {
    FluxAggregatorMessageHandler f = new FluxAggregatorMessageHandler();
    f.setWindowTimespan(Duration.ofSeconds(5));
    f.setCombineFunction(messageFlux -> messageFlux
        .map(Message::getPayload)
        .collectList()
        .map(GenericMessage::new);
    return f;
}

@Bean
public IntegrationFlow dataPipeline() {
   return IntegrationFlows.from(somePublisher)
// ----> The type Message<?> passed? Or Flux<Message<?>>?
      .handle(randomIdsBatchAggregator())
// ----> What type has been returned from the aggregation?
      .handle(bla())
      .get();
}

除了了解示例中传递的类型之外,我还想知道一般情况下如何知道 IntegrationFlow 中流动的对象及其类型。

IntegrationFlows.from(somePublisher)

这会在内部创建一个 FluxMessageChannel,它订阅提供的 Publsiher。每个事件都从这个频道发送到它的订阅者——你的聚合器。

FluxAggregatorMessageHandler 生成 setCombineFunction() JavaDocs 中解释的任何内容:

/**
 * Configure a transformation {@link Function} to apply for a {@link Flux} window to emit.
 * Requires a {@link Mono} result with a {@link Message} as value as a combination result
 * of the incoming {@link Flux} for window.
 * By default a {@link Flux} for window is fully wrapped into a message with headers copied
 * from the first message in window. Such a {@link Flux} in the payload has to be subscribed
 * and consumed downstream.
 * @param combineFunction the {@link Function} to use for result windows transformation.
 */
public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {

因此,它是一个 Mono,其中包含您真正用 .collectList() 处理的消息。当框架从 FluxAggregatorMessageHandler 发出回复消息时,Mono 被框架订阅。因此,您的 .handle(bla()) 必须期望有效负载列表。这对于聚合器结果来说真的很自然。

在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#flux-aggregator