骆驼多播;直接和 AMQ
Camel Multicast; Direct and AMQ
我有一个通用 Camel 路由,它监听一个 CXF:Bean(SOAP) 请求,数据格式为:POJO。
我想要这条路由做的是将传入的交换多播到 AMQ JMS 队列,稍后将由另一个内部路由使用,另一个端点是 direct:Endpoint,将用于用 SOAP 消息回复,即 "Success"
但是,StackTrace 出现以下错误:
015-06-03 16:56:49.645 INFO 4992 --- [qtp110495750-79] XxToYyyAsynchronizerRoute : An error has occurred. java.lang.RuntimeException: xxx.zzz.yyy.xx_to_zzz.RequestType
at org.apache.activemq.command.ActiveMQObjectMessage.storeContent(ActiveMQObjectMessage.java:118)
at org.apache.activemq.command.ActiveMQObjectMessage.setObject(ActiveMQObjectMessage.java:169)
at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:561)
at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:473)
at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:289)
at org.apache.camel.component.jms.JmsProducer.createMessage(JmsProducer.java:297)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:274)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access0(JmsConfiguration.java:217)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doInJms(JmsConfiguration.java:231)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:493)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:431)
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:385)
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.cxf.CxfConsumer.asyncInvoke(CxfConsumer.java:95)
at org.apache.camel.component.cxf.CxfConsumer.invoke(CxfConsumer.java:75)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor.run(ServiceInvokerInterceptor.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor.run(ServiceInvokerInterceptor.java:126)
at org.apache.cxf.workqueue.SynchronousExecutor.execute(SynchronousExecutor.java:37)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:131)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:307)
at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121)
at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:251)
at org.apache.cxf.transport.http_jetty.JettyHTTPDestination.doService(JettyHTTPDestination.java:234)
at org.apache.cxf.transport.http_jetty.JettyHTTPHandler.handle(JettyHTTPHandler.java:70)
at org.eclipse.jetty.server.handler.ContextHandler.__doHandle(ContextHandler.java:1129)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1065)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:215)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:497)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
at org.eclipse.jetty.io.AbstractConnection.run(AbstractConnection.java:540)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)
我的路线定义如下:
onException(Exception.class)
.handled(true)
.log(LoggingLevel.INFO, "An error has occurred. ${exception.stacktrace}");
from("{{xyz.route.zzz.soap.endpoint}}")
.routeId(xx_TO_zz_ROUTE_ID)
.log(LoggingLevel.INFO, "Placing messages into generic queue!")
.multicast().parallelProcessing().to("{{xxx.yy.to.zzz.jms.queue}}", "{{aaaa.xxx.generic.route}}");
}
出于测试目的,如果我删除 QueueEndpoint 并仅将其发送到 GenericRoute,我将获得带有 "SUCCESS" 的 SOAP 响应。我怀疑我没有正确使用多播或者 AMQ(JMS) 队列不能使用 POJO 数据格式?
有人可以帮忙吗? :|
好的,所以我从 Cyaegha 那里带头(谢谢),我使用 wsdl2java 生成了 POJO,并使用了下面的答案。
Solution for wsdl2java serialization
现在可以使用这些实现可序列化接口的 POJO!
快乐时光! =)
我有一个通用 Camel 路由,它监听一个 CXF:Bean(SOAP) 请求,数据格式为:POJO。
我想要这条路由做的是将传入的交换多播到 AMQ JMS 队列,稍后将由另一个内部路由使用,另一个端点是 direct:Endpoint,将用于用 SOAP 消息回复,即 "Success"
但是,StackTrace 出现以下错误:
015-06-03 16:56:49.645 INFO 4992 --- [qtp110495750-79] XxToYyyAsynchronizerRoute : An error has occurred. java.lang.RuntimeException: xxx.zzz.yyy.xx_to_zzz.RequestType
at org.apache.activemq.command.ActiveMQObjectMessage.storeContent(ActiveMQObjectMessage.java:118)
at org.apache.activemq.command.ActiveMQObjectMessage.setObject(ActiveMQObjectMessage.java:169)
at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:561)
at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:473)
at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:289)
at org.apache.camel.component.jms.JmsProducer.createMessage(JmsProducer.java:297)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:274)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access0(JmsConfiguration.java:217)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doInJms(JmsConfiguration.java:231)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:493)
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:431)
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:385)
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.cxf.CxfConsumer.asyncInvoke(CxfConsumer.java:95)
at org.apache.camel.component.cxf.CxfConsumer.invoke(CxfConsumer.java:75)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor.run(ServiceInvokerInterceptor.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor.run(ServiceInvokerInterceptor.java:126)
at org.apache.cxf.workqueue.SynchronousExecutor.execute(SynchronousExecutor.java:37)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:131)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:307)
at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121)
at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:251)
at org.apache.cxf.transport.http_jetty.JettyHTTPDestination.doService(JettyHTTPDestination.java:234)
at org.apache.cxf.transport.http_jetty.JettyHTTPHandler.handle(JettyHTTPHandler.java:70)
at org.eclipse.jetty.server.handler.ContextHandler.__doHandle(ContextHandler.java:1129)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1065)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:215)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:497)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
at org.eclipse.jetty.io.AbstractConnection.run(AbstractConnection.java:540)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)
我的路线定义如下:
onException(Exception.class)
.handled(true)
.log(LoggingLevel.INFO, "An error has occurred. ${exception.stacktrace}");
from("{{xyz.route.zzz.soap.endpoint}}")
.routeId(xx_TO_zz_ROUTE_ID)
.log(LoggingLevel.INFO, "Placing messages into generic queue!")
.multicast().parallelProcessing().to("{{xxx.yy.to.zzz.jms.queue}}", "{{aaaa.xxx.generic.route}}");
}
出于测试目的,如果我删除 QueueEndpoint 并仅将其发送到 GenericRoute,我将获得带有 "SUCCESS" 的 SOAP 响应。我怀疑我没有正确使用多播或者 AMQ(JMS) 队列不能使用 POJO 数据格式?
有人可以帮忙吗? :|
好的,所以我从 Cyaegha 那里带头(谢谢),我使用 wsdl2java 生成了 POJO,并使用了下面的答案。
Solution for wsdl2java serialization
现在可以使用这些实现可序列化接口的 POJO!
快乐时光! =)