spring-集成并行拆分路由聚合流由于单向 MessageHandler 而失败
spring-integration parallel split-route-aggregate flow fails due to one-way MessageHandler
我想通过拆分并行处理项目列表,将每个项目路由到它们适当的网关并聚合结果。但是,我的应用程序没有启动,出现以下错误:
BeanCreationException: The 'currentComponent' ... is a one-way 'MessageHandler'
and it isn't appropriate to configure 'outputChannel'.
This is the end of the integration flow.
这是一个说明行为的示例流定义:
@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
return IntegrationFlows
.from(Http.inboundGateway("/trigger"))
.handle(message -> Arrays.asList(1, 2, 3))
.split()
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.<Integer, Boolean>route(o -> o % 2 == 0, m -> m
.subFlowMapping(true, oddFlow())
.subFlowMapping(false, evenFlow()))
.aggregate()
.get();
}
@Bean
public IntegrationFlow oddFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "odd");
}
@Bean
public IntegrationFlow evenFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "even");
}
我看过,但那里的解决方案不适用于这里,我没有登录 handle() 方法。
我还尝试将 .defaultOutputToParentFlow() 添加到 mappingDefinition,因为咖啡馆示例使用了它,但这也没有区别。
我应该提到这是 spring-integration 5.0.4 with spring-boot 2.0.1 版本。
你的问题在这里:
.handle(message -> Arrays.asList(1, 2, 3))
如果您要使用内联实现,它看起来像:
.handle(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Arrays.asList(1, 2, 3);
}
})
注意 void
return 类型。由于没有任何东西要 return,因此没有任何东西可以发送到下游 - is a one-way 'MessageHandler'
.
要解决您的问题,您需要这样做:
.handle((p, h) -> Arrays.asList(1, 2, 3))
相当于:
.handle(new GenericHandler<Object>() {
@Override
public Object handle(Object p, Map<String, Object> h) {
return Arrays.asList(1, 2, 3);
}
})
实际上我的 IDEA 说我是你的变体,比如:
这给了我一些提示,表明我做错了什么。
更新
工作代码:
@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
return IntegrationFlows
.from(Http.inboundGateway("/trigger"))
.handle((p, h) -> Arrays.asList(1, 2, 3))
.split()
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.<Integer, Boolean>route(o -> o % 2 == 0, m -> m
.subFlowMapping(true, sf -> sf.gateway(oddFlow()))
.subFlowMapping(false, sf -> sf.gateway(evenFlow())))
.aggregate()
.get();
}
@Bean
public IntegrationFlow oddFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "odd");
}
@Bean
public IntegrationFlow evenFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "even");
}
@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
return IntegrationFlows
.from(Http.inboundGateway("/trigger"))
.handle((p, h) -> Arrays.asList(1, 2, 3))
.split()
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.<Integer, Boolean>route(o -> o % 2 == 0, m -> m
.subFlowMapping(true, oddFlow())
.subFlowMapping(false, evenFlow()))
.get();
}
@Bean
public IntegrationFlow oddFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "odd")
.channel("agg.input");
}
@Bean
public IntegrationFlow evenFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "even")
.channel("agg.input");
}
@Bean
public IntegrationFlow agg() {
return f -> f.aggregate();
}
如果您需要通过多个 "workers" 发送 "distribute" 消息,并在连接点返回消息,则可以使用 .scatterGather(...)
方法。 Deemingly,它以更适合在 IntegrationFlow 域内使用的方式包装了 .route(...)
功能。
如下例所示:
@Bean
public IntegrationFlow evenOddFlows() {
return IntegrationFlows.from(Http.inboundGateway("/trigger"))
.handle((payload,headers)->Arrays.asList(1,2,3))
.split()
.scatterGather(r->r.applySequence(true)
.recipientFlow(m->(int)m.getPayload()%2==0, evenFlow-> evenFlow.log(m->"Even flow with payload: "+m.getPayload()).<Integer,Integer>transform(h-> h+50)
.handle((payload,headers)->(int)payload+50).log(m->"At Even flow end with payload: "+m.getPayload())
.handle((payload,headers)->payload) /* This .handle(...) doesn't do a real job.
* Instead, it is to patch something that at least I regard as a bug.
* Having not the .handle(...) would leave the .log(...) at the end of the flow.
* After crossing a .log(...) if right at the flow's end, the response message doesn't arrive back the parent flow (hence my aprising there is a bug).
* With the "appended" .handle(...) afterwards, avoid the .log(...) being the last one in the flow, as well as tests show the message is sent away where the parent flow receives it.
*/
)
.recipientFlow(m->(int)m.getPayload()%2!=0, oddFlow-> oddFlow.log(m->"Odd flow with payload: "+m.getPayload()).<Integer,Integer>transform(h-> h+10)
.handle((payload,headers)->(int)payload+10).log(m->"At Odd flow end with payload: "+m.getPayload())
.handle((payload,headers)->payload) // This .handle(...) I needed as a patch because otherwise the .log(...) being the last one in the subflow swallowed the message
)
)
.aggregate()
.get()
;
}
curl -i -H "Content-type: application/json" http://localhost:8080/trigger
卷曲输出:
[[21],[102],[23]]
日志:
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : Odd flow with payload: 1
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : At Odd flow end with payload: 21
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : Even flow with payload: 2
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : At Even flow end with payload: 102
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : Odd flow with payload: 3
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : At Odd flow end with payload: 23
我想通过拆分并行处理项目列表,将每个项目路由到它们适当的网关并聚合结果。但是,我的应用程序没有启动,出现以下错误:
BeanCreationException: The 'currentComponent' ... is a one-way 'MessageHandler'
and it isn't appropriate to configure 'outputChannel'.
This is the end of the integration flow.
这是一个说明行为的示例流定义:
@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
return IntegrationFlows
.from(Http.inboundGateway("/trigger"))
.handle(message -> Arrays.asList(1, 2, 3))
.split()
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.<Integer, Boolean>route(o -> o % 2 == 0, m -> m
.subFlowMapping(true, oddFlow())
.subFlowMapping(false, evenFlow()))
.aggregate()
.get();
}
@Bean
public IntegrationFlow oddFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "odd");
}
@Bean
public IntegrationFlow evenFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "even");
}
我看过
我应该提到这是 spring-integration 5.0.4 with spring-boot 2.0.1 版本。
你的问题在这里:
.handle(message -> Arrays.asList(1, 2, 3))
如果您要使用内联实现,它看起来像:
.handle(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Arrays.asList(1, 2, 3);
}
})
注意 void
return 类型。由于没有任何东西要 return,因此没有任何东西可以发送到下游 - is a one-way 'MessageHandler'
.
要解决您的问题,您需要这样做:
.handle((p, h) -> Arrays.asList(1, 2, 3))
相当于:
.handle(new GenericHandler<Object>() {
@Override
public Object handle(Object p, Map<String, Object> h) {
return Arrays.asList(1, 2, 3);
}
})
实际上我的 IDEA 说我是你的变体,比如:
这给了我一些提示,表明我做错了什么。
更新
工作代码:
@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
return IntegrationFlows
.from(Http.inboundGateway("/trigger"))
.handle((p, h) -> Arrays.asList(1, 2, 3))
.split()
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.<Integer, Boolean>route(o -> o % 2 == 0, m -> m
.subFlowMapping(true, sf -> sf.gateway(oddFlow()))
.subFlowMapping(false, sf -> sf.gateway(evenFlow())))
.aggregate()
.get();
}
@Bean
public IntegrationFlow oddFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "odd");
}
@Bean
public IntegrationFlow evenFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "even");
}
@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
return IntegrationFlows
.from(Http.inboundGateway("/trigger"))
.handle((p, h) -> Arrays.asList(1, 2, 3))
.split()
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.<Integer, Boolean>route(o -> o % 2 == 0, m -> m
.subFlowMapping(true, oddFlow())
.subFlowMapping(false, evenFlow()))
.get();
}
@Bean
public IntegrationFlow oddFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "odd")
.channel("agg.input");
}
@Bean
public IntegrationFlow evenFlow() {
return flow -> flow.<Integer>handle((payload, headers) -> "even")
.channel("agg.input");
}
@Bean
public IntegrationFlow agg() {
return f -> f.aggregate();
}
如果您需要通过多个 "workers" 发送 "distribute" 消息,并在连接点返回消息,则可以使用 .scatterGather(...)
方法。 Deemingly,它以更适合在 IntegrationFlow 域内使用的方式包装了 .route(...)
功能。
如下例所示:
@Bean
public IntegrationFlow evenOddFlows() {
return IntegrationFlows.from(Http.inboundGateway("/trigger"))
.handle((payload,headers)->Arrays.asList(1,2,3))
.split()
.scatterGather(r->r.applySequence(true)
.recipientFlow(m->(int)m.getPayload()%2==0, evenFlow-> evenFlow.log(m->"Even flow with payload: "+m.getPayload()).<Integer,Integer>transform(h-> h+50)
.handle((payload,headers)->(int)payload+50).log(m->"At Even flow end with payload: "+m.getPayload())
.handle((payload,headers)->payload) /* This .handle(...) doesn't do a real job.
* Instead, it is to patch something that at least I regard as a bug.
* Having not the .handle(...) would leave the .log(...) at the end of the flow.
* After crossing a .log(...) if right at the flow's end, the response message doesn't arrive back the parent flow (hence my aprising there is a bug).
* With the "appended" .handle(...) afterwards, avoid the .log(...) being the last one in the flow, as well as tests show the message is sent away where the parent flow receives it.
*/
)
.recipientFlow(m->(int)m.getPayload()%2!=0, oddFlow-> oddFlow.log(m->"Odd flow with payload: "+m.getPayload()).<Integer,Integer>transform(h-> h+10)
.handle((payload,headers)->(int)payload+10).log(m->"At Odd flow end with payload: "+m.getPayload())
.handle((payload,headers)->payload) // This .handle(...) I needed as a patch because otherwise the .log(...) being the last one in the subflow swallowed the message
)
)
.aggregate()
.get()
;
}
curl -i -H "Content-type: application/json" http://localhost:8080/trigger
卷曲输出:
[[21],[102],[23]]
日志:
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : Odd flow with payload: 1
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : At Odd flow end with payload: 21
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : Even flow with payload: 2
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : At Even flow end with payload: 102
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : Odd flow with payload: 3
2019-05-17 16:19:11.061 INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler : At Odd flow end with payload: 23