如何查看在 Spring Integration 的 IntegrationFlow 中流动的类型
How to see the types that flows in Spring Integration's IntegrationFlow
当我在 Spring 集成中聚合时,我试图了解 returns 的类型是什么,这非常困难。我正在使用 Project Reactor,我的代码片段是:
public FluxAggregatorMessageHandler randomIdsBatchAggregator() {
FluxAggregatorMessageHandler f = new FluxAggregatorMessageHandler();
f.setWindowTimespan(Duration.ofSeconds(5));
f.setCombineFunction(messageFlux -> messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new);
return f;
}
@Bean
public IntegrationFlow dataPipeline() {
return IntegrationFlows.from(somePublisher)
// ----> The type Message<?> passed? Or Flux<Message<?>>?
.handle(randomIdsBatchAggregator())
// ----> What type has been returned from the aggregation?
.handle(bla())
.get();
}
除了了解示例中传递的类型之外,我还想知道一般情况下如何知道 IntegrationFlow
中流动的对象及其类型。
IntegrationFlows.from(somePublisher)
这会在内部创建一个 FluxMessageChannel
,它订阅提供的 Publsiher
。每个事件都从这个频道发送到它的订阅者——你的聚合器。
FluxAggregatorMessageHandler
生成 setCombineFunction()
JavaDocs 中解释的任何内容:
/**
* Configure a transformation {@link Function} to apply for a {@link Flux} window to emit.
* Requires a {@link Mono} result with a {@link Message} as value as a combination result
* of the incoming {@link Flux} for window.
* By default a {@link Flux} for window is fully wrapped into a message with headers copied
* from the first message in window. Such a {@link Flux} in the payload has to be subscribed
* and consumed downstream.
* @param combineFunction the {@link Function} to use for result windows transformation.
*/
public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {
因此,它是一个 Mono
,其中包含您真正用 .collectList()
处理的消息。当框架从 FluxAggregatorMessageHandler
发出回复消息时,Mono
被框架订阅。因此,您的 .handle(bla())
必须期望有效负载列表。这对于聚合器结果来说真的很自然。
当我在 Spring 集成中聚合时,我试图了解 returns 的类型是什么,这非常困难。我正在使用 Project Reactor,我的代码片段是:
public FluxAggregatorMessageHandler randomIdsBatchAggregator() {
FluxAggregatorMessageHandler f = new FluxAggregatorMessageHandler();
f.setWindowTimespan(Duration.ofSeconds(5));
f.setCombineFunction(messageFlux -> messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new);
return f;
}
@Bean
public IntegrationFlow dataPipeline() {
return IntegrationFlows.from(somePublisher)
// ----> The type Message<?> passed? Or Flux<Message<?>>?
.handle(randomIdsBatchAggregator())
// ----> What type has been returned from the aggregation?
.handle(bla())
.get();
}
除了了解示例中传递的类型之外,我还想知道一般情况下如何知道 IntegrationFlow
中流动的对象及其类型。
IntegrationFlows.from(somePublisher)
这会在内部创建一个 FluxMessageChannel
,它订阅提供的 Publsiher
。每个事件都从这个频道发送到它的订阅者——你的聚合器。
FluxAggregatorMessageHandler
生成 setCombineFunction()
JavaDocs 中解释的任何内容:
/**
* Configure a transformation {@link Function} to apply for a {@link Flux} window to emit.
* Requires a {@link Mono} result with a {@link Message} as value as a combination result
* of the incoming {@link Flux} for window.
* By default a {@link Flux} for window is fully wrapped into a message with headers copied
* from the first message in window. Such a {@link Flux} in the payload has to be subscribed
* and consumed downstream.
* @param combineFunction the {@link Function} to use for result windows transformation.
*/
public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {
因此,它是一个 Mono
,其中包含您真正用 .collectList()
处理的消息。当框架从 FluxAggregatorMessageHandler
发出回复消息时,Mono
被框架订阅。因此,您的 .handle(bla())
必须期望有效负载列表。这对于聚合器结果来说真的很自然。