通过 Java DSL 在 JDK 下配置 Spring 集成聚合器 7
Configure Spring Integration Aggregator via Java DSL under JDK 7
这是我第一次在 Java 下通过 DSL 配置 Spring 集成 7. 我们知道 Lambda 表达式仅在 Java 下有效 8. 所以我参考示例 Spring Integration Java DSL and Spring Integration Java DSL (pre Java 8): Line by line tutorial 使我的配置如下,以收集每 100 条相同资源的消息发送到远程 RESTful 服务。
@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
@Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {
return IntegrationFlows.from("rawStringParsingRequestChannel")
.transform(new RawStringToCheckDataMessageTransformer())
.transform(new DataMessageToDtoTransformer())
.aggregate(new Consumer<AggregatorSpec>(){
@Override public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null)
.correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
.releaseStrategy(new MessageCountReleaseStrategy(100))
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("60000") ;
}
})
.transform(headerEnricher)
.transform(new ObjectToJsonTransformer())
.handle(httpOutboundAdapter)
.get();
}
但是,配置对我不起作用,它抛出如下异常。
Exception in thread "main" java.lang.IllegalStateException: Failed to process message list
at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:79)
at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.aggregatePayloads(MethodInvokingMessageGroupProcessor.java:86)
at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:84)
at org.springframework.integration.dsl.AggregatorSpec$MessageGroupProcessorWrapper.processMessageGroup(AggregatorSpec.java:127)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:665)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:392)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy45.sendRawData(Unknown Source)
at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.extractData(HistoricDataRetriever.java:82)
at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.extractData(HistoricDataRetriever.java:68)
at org.springframework.jdbc.core.JdbcTemplate.doInPreparedStatement(JdbcTemplate.java:697)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:633)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:684)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:716)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:726)
at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.retrieveHistoricData(HistoricDataRetriever.java:92)
at prototype.healthcloud.historic.data.pusher.Application.main(Application.java:119)
Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter method
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:640)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:211)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:81)
at org.springframework.expression.spel.ast.MethodReference.getArguments(MethodReference.java:154)
at org.springframework.expression.spel.ast.MethodReference.getValueRef(MethodReference.java:71)
at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:66)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:87)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:319)
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:160)
at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:73)
... 61 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:636)
... 74 more
Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection.
at org.springframework.util.Assert.state(Assert.java:70)
at org.springframework.integration.util.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(MessagingMethodInvokerHelper.java:920)
... 79 more
根本原因在o.s.i.u.MessagingMethodInvokerHelper$HandlerMethod方法generateExpression中,annotationType为NULL且parameterTypeo.s.i.s.MessageGroup既不是Collection
、Collection<Message<?>>
的子接口,也不是Collection<Message<?>>
的子接口数组,因此表达式将设置为“#target.processMessageGroup(payload)”。我想必须添加额外的逻辑块才能正确处理 MessageGroup 类型(不确定)。
由于我的聚合逻辑非常简单,我通过如下指定 outputExpression 找到了解决方案。
@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
@Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {
return IntegrationFlows.from("rawStringParsingRequestChannel")
.transform(new RawStringToCheckDataMessageTransformer())
.transform(new DataMessageToDtoTransformer())
.aggregate(new Consumer<AggregatorSpec>(){
@Override public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.outputExpression("#this.![payload]")
.correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
.releaseStrategy(new MessageCountReleaseStrategy(100))
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("60000") ;
}
})
.transform(headerEnricher)
.transform(new ObjectToJsonTransformer())
.handle(httpOutboundAdapter)
.get();
}
到目前为止,变通解决方案适用于我,但我的问题是如果聚合逻辑很复杂,如何配置处理器。
aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null)
你不能在那个方法中使用具体的 MessageGroupProcessor
;它需要一个 POJO bean 和方法名称(如果 bean 上只有一个符合条件的方法,则可以为 null)。
使用
aggregatorSpec.outputProcessor(new SimpleMessageGroupProcessor())
请注意,该处理器的输出将是消息组,这可能不是您想要的。
您可能需要考虑使用 DefaultAggregatingMessageGroupProcessor
(如果您不提供 outputProcessor
,这是默认设置)。
这是我第一次在 Java 下通过 DSL 配置 Spring 集成 7. 我们知道 Lambda 表达式仅在 Java 下有效 8. 所以我参考示例 Spring Integration Java DSL and Spring Integration Java DSL (pre Java 8): Line by line tutorial 使我的配置如下,以收集每 100 条相同资源的消息发送到远程 RESTful 服务。
@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
@Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {
return IntegrationFlows.from("rawStringParsingRequestChannel")
.transform(new RawStringToCheckDataMessageTransformer())
.transform(new DataMessageToDtoTransformer())
.aggregate(new Consumer<AggregatorSpec>(){
@Override public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null)
.correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
.releaseStrategy(new MessageCountReleaseStrategy(100))
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("60000") ;
}
})
.transform(headerEnricher)
.transform(new ObjectToJsonTransformer())
.handle(httpOutboundAdapter)
.get();
}
但是,配置对我不起作用,它抛出如下异常。
Exception in thread "main" java.lang.IllegalStateException: Failed to process message list
at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:79)
at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.aggregatePayloads(MethodInvokingMessageGroupProcessor.java:86)
at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:84)
at org.springframework.integration.dsl.AggregatorSpec$MessageGroupProcessorWrapper.processMessageGroup(AggregatorSpec.java:127)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:665)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:392)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy45.sendRawData(Unknown Source)
at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.extractData(HistoricDataRetriever.java:82)
at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.extractData(HistoricDataRetriever.java:68)
at org.springframework.jdbc.core.JdbcTemplate.doInPreparedStatement(JdbcTemplate.java:697)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:633)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:684)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:716)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:726)
at prototype.healthcloud.historic.data.pusher.HistoricDataRetriever.retrieveHistoricData(HistoricDataRetriever.java:92)
at prototype.healthcloud.historic.data.pusher.Application.main(Application.java:119)
Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter method
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:640)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:211)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:81)
at org.springframework.expression.spel.ast.MethodReference.getArguments(MethodReference.java:154)
at org.springframework.expression.spel.ast.MethodReference.getValueRef(MethodReference.java:71)
at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:66)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:87)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:319)
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:160)
at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:73)
... 61 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:636)
... 74 more
Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection.
at org.springframework.util.Assert.state(Assert.java:70)
at org.springframework.integration.util.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(MessagingMethodInvokerHelper.java:920)
... 79 more
根本原因在o.s.i.u.MessagingMethodInvokerHelper$HandlerMethod方法generateExpression中,annotationType为NULL且parameterTypeo.s.i.s.MessageGroup既不是Collection
、Collection<Message<?>>
的子接口,也不是Collection<Message<?>>
的子接口数组,因此表达式将设置为“#target.processMessageGroup(payload)”。我想必须添加额外的逻辑块才能正确处理 MessageGroup 类型(不确定)。
由于我的聚合逻辑非常简单,我通过如下指定 outputExpression 找到了解决方案。
@Bean
public IntegrationFlow rawDataParsingAndSendingFlow(@Autowired HttpRequestExecutingMessageHandler httpOutboundAdapter,
@Autowired @Qualifier("headerEnricher") HeaderEnricher headerEnricher) {
return IntegrationFlows.from("rawStringParsingRequestChannel")
.transform(new RawStringToCheckDataMessageTransformer())
.transform(new DataMessageToDtoTransformer())
.aggregate(new Consumer<AggregatorSpec>(){
@Override public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.outputExpression("#this.![payload]")
.correlationStrategy(new HeaderAttributeCorrelationStrategy("resource"))
.releaseStrategy(new MessageCountReleaseStrategy(100))
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("60000") ;
}
})
.transform(headerEnricher)
.transform(new ObjectToJsonTransformer())
.handle(httpOutboundAdapter)
.get();
}
到目前为止,变通解决方案适用于我,但我的问题是如果聚合逻辑很复杂,如何配置处理器。
aggregatorSpec.processor(new SimpleMessageGroupProcessor(), null)
你不能在那个方法中使用具体的 MessageGroupProcessor
;它需要一个 POJO bean 和方法名称(如果 bean 上只有一个符合条件的方法,则可以为 null)。
使用
aggregatorSpec.outputProcessor(new SimpleMessageGroupProcessor())
请注意,该处理器的输出将是消息组,这可能不是您想要的。
您可能需要考虑使用 DefaultAggregatingMessageGroupProcessor
(如果您不提供 outputProcessor
,这是默认设置)。