LambdaMessageProcessor 无法识别 ConversionService 的负载类型
LambdaMessageProcessor doesn't recognize payload type for ConversionService
我正在使用 Spring Integration 4.2.5.RELEASE 和 Spring Integration Java DSL 1.1.2.RELEASE.
我无法使自定义转换正常工作。我已经注册了自定义转换器以从 byte[]
转换为 my.object.MyClass
@Bean
@IntegrationConverter
public Converter bytesToMyClass() {
return new Converter<byte[], my.object.MyClass>() {
@Override
public my.object.MyClass convert(byte[] source) {
try {
return my.object.MyClass.newBuilder().mergeFrom(source).build();
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Could not convert message.", e);
}
}
};
}
然后我设置我的集成流程。目标是根据名为 insertBuffer
的变量将消息路由到两条路径之一。如果 insertBuffer
> 1,则聚合消息。否则,只需将单个消息包装在一个集合中并将其发送到服务方法。这是我的流程:
@Bean
public IntegrationFlow routeInput( MessageChannel input, MyClassService service ) {
return IntegrationFlows.from(input)
.<my.object.MyClass, Boolean>route((my.object.MyClass payload) -> insertBuffer > 1, mapping -> mapping
.subFlowMapping("true", aggregateflow -> aggregateflow
.<my.object.MyClass, Collection<my.object.MyClass>>aggregate(a -> a
.correlationStrategy(message -> 0) //all messages are part of the same group for now.
.releaseStrategy(group -> group.size() >= insertBuffer)
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(true)
.expireGroupsUponTimeout(true)
.groupTimeout(2000)))
.subFlowMapping("false", single -> single
.<my.object.MyClass, Collection<my.object.MyClass>>transform(Arrays::asList)
))
.handle(Collection.class, (payload, headers) ->
service.saveResult(payload))
.get();
}
但是,当我尝试 运行 这个时,我得到以下异常 java.lang.ClassCastException: [B cannot be cast to my.object.MyClass
(下面的完整堆栈)。
经过一些调试,我注意到当 org.springframework.integration.dsl.LambdaMessageProcessor#processMessage
试图处理消息时,payloadType
是 java.lang.Object
而我认为它应该是 my.object.MyClass
。
似乎我的所有泛型都是正确的,我缺少什么?
完整堆栈跟踪:
ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: [B cannot be cast to my.object.MyClass
at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:130)
at org.springframework.integration.router.AbstractMessageProcessingRouter.getChannelKeys(AbstractMessageProcessingRouter.java:80)
at org.springframework.integration.router.AbstractMappingMessageRouter.determineTargetChannels(AbstractMappingMessageRouter.java:148)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:154)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:103)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=12=]0(AbstractPollingEndpoint.java:57)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:176)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:173)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:330)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to my.object.MyClass
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:127)
... 37 more
Lambda 无法在运行时确定泛型类型的问题:Java: how to resolve generic type of lambda parameter?
这就是为什么我们在这个问题上有一个重载方法:
.<my.object.MyClass, Boolean>route(my.object.MyClass.class, payload -> insertBuffer > 1, mapping -> mapping
我正在使用 Spring Integration 4.2.5.RELEASE 和 Spring Integration Java DSL 1.1.2.RELEASE.
我无法使自定义转换正常工作。我已经注册了自定义转换器以从 byte[]
转换为 my.object.MyClass
@Bean
@IntegrationConverter
public Converter bytesToMyClass() {
return new Converter<byte[], my.object.MyClass>() {
@Override
public my.object.MyClass convert(byte[] source) {
try {
return my.object.MyClass.newBuilder().mergeFrom(source).build();
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Could not convert message.", e);
}
}
};
}
然后我设置我的集成流程。目标是根据名为 insertBuffer
的变量将消息路由到两条路径之一。如果 insertBuffer
> 1,则聚合消息。否则,只需将单个消息包装在一个集合中并将其发送到服务方法。这是我的流程:
@Bean
public IntegrationFlow routeInput( MessageChannel input, MyClassService service ) {
return IntegrationFlows.from(input)
.<my.object.MyClass, Boolean>route((my.object.MyClass payload) -> insertBuffer > 1, mapping -> mapping
.subFlowMapping("true", aggregateflow -> aggregateflow
.<my.object.MyClass, Collection<my.object.MyClass>>aggregate(a -> a
.correlationStrategy(message -> 0) //all messages are part of the same group for now.
.releaseStrategy(group -> group.size() >= insertBuffer)
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(true)
.expireGroupsUponTimeout(true)
.groupTimeout(2000)))
.subFlowMapping("false", single -> single
.<my.object.MyClass, Collection<my.object.MyClass>>transform(Arrays::asList)
))
.handle(Collection.class, (payload, headers) ->
service.saveResult(payload))
.get();
}
但是,当我尝试 运行 这个时,我得到以下异常 java.lang.ClassCastException: [B cannot be cast to my.object.MyClass
(下面的完整堆栈)。
经过一些调试,我注意到当 org.springframework.integration.dsl.LambdaMessageProcessor#processMessage
试图处理消息时,payloadType
是 java.lang.Object
而我认为它应该是 my.object.MyClass
。
似乎我的所有泛型都是正确的,我缺少什么?
完整堆栈跟踪:
ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: [B cannot be cast to my.object.MyClass
at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:130)
at org.springframework.integration.router.AbstractMessageProcessingRouter.getChannelKeys(AbstractMessageProcessingRouter.java:80)
at org.springframework.integration.router.AbstractMappingMessageRouter.determineTargetChannels(AbstractMappingMessageRouter.java:148)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:154)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:103)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=12=]0(AbstractPollingEndpoint.java:57)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:176)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:173)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:330)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to my.object.MyClass
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:127)
... 37 more
Lambda 无法在运行时确定泛型类型的问题:Java: how to resolve generic type of lambda parameter?
这就是为什么我们在这个问题上有一个重载方法:
.<my.object.MyClass, Boolean>route(my.object.MyClass.class, payload -> insertBuffer > 1, mapping -> mapping