Spring 集成 DSL ScatterGather 流块
Spring Integration DSL ScatterGather flow blocks
我有一个集成流程,它执行分散收集操作,该操作命中多个 HTTP 端点 returning JSON。然后将结果聚合到单个 JSON 对象中。流程是这样的
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.channel("myFlow.output");
}
我正在使用如下声明的网关启动流程
@MessagingGateway
public interface IMyGateway {
@Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);
}
我遇到的问题是整个流程阻塞,网关超时。我在两个服务调用 IMyService::handleAggregatedJson 和 IMyOutherService::handleMyServiceResult 中设置了断点,它们都是 运行,但是输出永远不会到达网关的回复通道。如果我删除最后两个 handle 操作,那么流程 returns 通常是通过网关的结果。
我查看了流被阻塞时的堆栈跟踪,我可以看到线程 运行 流正在等待锁
java.lang.Thread.State: WAITING at
sun.misc.Unsafe.park(Unsafe.java:-1) at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at
org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:308)
at
org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:300)
at
org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:201)
at
org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:234)
at
org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47)
at
org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45)
at
org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
at
org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38)
at
org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:95)
at
org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:85)
at
org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:487)
at
org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:461)
at
org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:520)
at
org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:469)
at
org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:460)
at
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
at
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy116.startFlow(Unknown Source:-1)
据我推测,如果流程花费的时间超过 X,那么它将阻塞。我尝试在流和网关之间放置一个集合点通道,但它似乎没有用。
关于导致超时问题的原因有什么想法吗?
附录:我一直在摆弄代码并删除了网关上的 return 类型,并且流程上的最后一个 .channel 调用似乎停止阻止它。
以下工作正常
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.handle(m -> {
log.info("Flow completed successfully, payload as expected:" + payload);
});
}
我想知道你的
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
收集 return 一些价值后。 request-reply 的典型错误,即流程中的某些步骤停止回复某个合理的值。
更新
您应该考虑从 @Gateway
定义中删除显式 replyChannel
声明,并从流程末尾删除 .channel("myFlow.output")
。这样你应该得到对 replyChannel
header 的回复。当您配置显式 replyChannel
时,无法保证您不会有其他订阅者到此频道来 "steal" 您的回复消息。
在 Reference Manual 中查看更多信息。
我有一个集成流程,它执行分散收集操作,该操作命中多个 HTTP 端点 returning JSON。然后将结果聚合到单个 JSON 对象中。流程是这样的
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.channel("myFlow.output");
}
我正在使用如下声明的网关启动流程
@MessagingGateway
public interface IMyGateway {
@Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);
}
我遇到的问题是整个流程阻塞,网关超时。我在两个服务调用 IMyService::handleAggregatedJson 和 IMyOutherService::handleMyServiceResult 中设置了断点,它们都是 运行,但是输出永远不会到达网关的回复通道。如果我删除最后两个 handle 操作,那么流程 returns 通常是通过网关的结果。
我查看了流被阻塞时的堆栈跟踪,我可以看到线程 运行 流正在等待锁
java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Unsafe.java:-1) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:308) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:300) at org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:201) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:234) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:95) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:85) at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:487) at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:461) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:520) at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:469) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:460) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) at com.sun.proxy.$Proxy116.startFlow(Unknown Source:-1)
据我推测,如果流程花费的时间超过 X,那么它将阻塞。我尝试在流和网关之间放置一个集合点通道,但它似乎没有用。
关于导致超时问题的原因有什么想法吗?
附录:我一直在摆弄代码并删除了网关上的 return 类型,并且流程上的最后一个 .channel 调用似乎停止阻止它。
以下工作正常
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.handle(m -> {
log.info("Flow completed successfully, payload as expected:" + payload);
});
}
我想知道你的
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
收集 return 一些价值后。 request-reply 的典型错误,即流程中的某些步骤停止回复某个合理的值。
更新
您应该考虑从 @Gateway
定义中删除显式 replyChannel
声明,并从流程末尾删除 .channel("myFlow.output")
。这样你应该得到对 replyChannel
header 的回复。当您配置显式 replyChannel
时,无法保证您不会有其他订阅者到此频道来 "steal" 您的回复消息。
在 Reference Manual 中查看更多信息。