Camel Spring 使用 Artemis Proton/Qpid 客户端进行远程处理 - 挂起发送消息
Camel Spring remoting with Artemis Proton/Qpid client - hangs sending message
- 当我使用 Camel spring 远程配置发送一些消息时,生产者和消费者都 运行 在不同的 JVM 中。
- 使用 Apache artemis 2.14.0 版本
- camel 版本 (2.20.0)、qpid (0.54.0)、pooled-jms (1.1.1)
我正在使用 LoadMessageSupport class 推送消息,我看到调用了 camel 路由并在调试日志消息下方。
我注意到在 Artemis 控制台中启用了制作者会话。
任何线索,如何调试或可能导致此问题的原因。
有一些 netty 相关的调试错误,我安全地忽略了。
...
DEBUG [main] (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=components,name="bean"
DEBUG [main] (DefaultComponent.java:266) - Cannot resolve property placeholders on component: org.apache.camel.component.bean.BeanComponent@cda0432 as PropertiesComponent is not in use
DEBUG [main] (AbstractAutowireCapableBeanFactory.java:448) - Creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
DEBUG [main] (AbstractAutowireCapableBeanFactory.java:484) - Finished creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (processMessage) from(direct://proxy-msg-handler) --> log[Log message on incoming message with body] <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
DEBUG [main] (CamelLogger.java:153) - Log message on incoming message with body
INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (SubmitNotificationEvent) log[Log message on incoming message with body] --> amqpcomponent://queue:message.queue <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
DEBUG [main] (SendProcessor.java:147) - >>>> service-event-queue://queue:message.queue Exchange[ID-local-vm-1624040900482-0-1]
DEBUG [main] (InternalLoggerFactory.java:45) - Using SLF4J as the default logging framework
...
DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 } is now open:
INFO [AmqpProvider :(1):[amqp://localhost:5672]] (JmsConnection.java:1339) - Connection ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 connected to server: amqp://localhost:5672
DEBUG [main] (JmsTemplate.java:492) - Executing callback on JMS Session: JmsPoolSession { org.apache.qpid.jms.JmsSession@7fd26ad8 }
DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProducerBuilder.java:68) - Creating AmqpFixedProducer for: null
DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade
启用TRACE级别日志后,注意到下面的msg
DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade { org.apache.qpid.jms.provider.amqp.message.AmqpJmsObjectMessageFacade@36cc9385 }
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpFixedProducer.java:100) - Holding Message send until credit is available.
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
- 下面是我用来从 java class.
发送消息的上下文 xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/util ttp://www.springframework.org/schema/util/spring-util.xsd">
<bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
<property name="remoteURI" value="amqp://localhost:5672?amqp.traceFrames=true"/>
</bean>
<bean id="jpcf" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop" >
<property name="maxConnections" value="3" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jpcf" />
<property name="concurrentConsumers" value="3" />
</bean>
<bean id="amqpcomponent" class="org.apache.camel.component.amqp.AMQPComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<!-- Camel Spring Remoting Interface -->
<camel:proxy id="proxyObject" binding="false" serviceUrl="direct:proxy-msg-handler" serviceInterface="com.myexample.MessageHandler"/>
<!-- Bean that initialize the Spring Remoting for handling message -->
<bean id="BeanProxy" class="com.myexample.MessageProducer">
<property name="messageHandler" ref="proxyObject"/>
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring" autoStartup="true" trace="true">
<camel:route autoStartup="true" id="processMessage">
<camel:from uri="direct:proxy-msg-handler"/>
<camel:log message="Log incoming message" logName="Incoming" loggingLevel="DEBUG"/>
<camel:inOnly uri="amqpcomponent:queue:message.queue"/>
</camel:route>
</camelContext>
</beans>
- java class 运行 上下文,用于调用远程 spring bean 方法。
- 使用下面的 java class 将消息推送到 Artemis 队列
package com.myexample;
public class LoadMessageSupport {
public static void main(String ...strings) {
ApplicationContext appContext =null;
try {
appContext = new ClassPathXmlApplicationContext("file:/paht/to/context/message-handler-context.xml");
MessageProducer messageProducer = appContext.getBean(MessageProducer.class);
message = "{ \"itemDesc\" : \"test description\" }" ;
System.out.println(message);
messageProducer.sendMessage(MessageType.ITEM_DESC, message);
// enum messagetype already defined within project
//System.exit(0);
}catch(Exception exe) {
System.out.println("Something wrong... ");
exe.printStackTrace();
}finally {
if(camelContext!=null) {
System.out.println("camel context stopped...");
camelContext.stop();
}
}
}
}
- 消息接收者class
@InOnly
public interface MessageHandler{
public processMessage(MessageType type, Order order);
public processMessage(MessageType type, String message); // trying to invoke this message
}
- 制作人class
public class MessageProducer{
// using the proxy object within the producer object
// this will invoke the spring bean using remote (rmi)
private MessageHandler messageHandler;
protected MessageHandler getMessageHandler() {
return this.messageHandler;
}
public void setMessageHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
//constructor
public MessageProducer() {}
public void sendMessage(MessageType type, Order order ){
getMessageHandler().processMessage(type,order);
}
public void sendMessage(MessageType type, String message ){
getMessageHandler().processMessage(type,message);
}
- 消息接收者
public class MessageReceiver implements MessageHandler {
@Handler
public void processMessage(MessageType type, Order order){
System.out.println(" received type and ORDER info ...");
// invoke methods for logical processing
}
@Handler
public void processMessage(MessageType type, String message){
System.out.println(" received type and MESSAGE info for procesing...");
// invoke methods for logical processing
}
}
我早些时候尝试时,似乎我的 VM 没有足够的内存。
free -h
表示只剩下500MB。
重启 VM 后,消息现在发送到 Artemis borker。
- 当我使用 Camel spring 远程配置发送一些消息时,生产者和消费者都 运行 在不同的 JVM 中。
- 使用 Apache artemis 2.14.0 版本
- camel 版本 (2.20.0)、qpid (0.54.0)、pooled-jms (1.1.1)
我正在使用 LoadMessageSupport class 推送消息,我看到调用了 camel 路由并在调试日志消息下方。
我注意到在 Artemis 控制台中启用了制作者会话。
任何线索,如何调试或可能导致此问题的原因。
有一些 netty 相关的调试错误,我安全地忽略了。
...
DEBUG [main] (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=components,name="bean"
DEBUG [main] (DefaultComponent.java:266) - Cannot resolve property placeholders on component: org.apache.camel.component.bean.BeanComponent@cda0432 as PropertiesComponent is not in use
DEBUG [main] (AbstractAutowireCapableBeanFactory.java:448) - Creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
DEBUG [main] (AbstractAutowireCapableBeanFactory.java:484) - Finished creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (processMessage) from(direct://proxy-msg-handler) --> log[Log message on incoming message with body] <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
DEBUG [main] (CamelLogger.java:153) - Log message on incoming message with body
INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (SubmitNotificationEvent) log[Log message on incoming message with body] --> amqpcomponent://queue:message.queue <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
DEBUG [main] (SendProcessor.java:147) - >>>> service-event-queue://queue:message.queue Exchange[ID-local-vm-1624040900482-0-1]
DEBUG [main] (InternalLoggerFactory.java:45) - Using SLF4J as the default logging framework
...
DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 } is now open:
INFO [AmqpProvider :(1):[amqp://localhost:5672]] (JmsConnection.java:1339) - Connection ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 connected to server: amqp://localhost:5672
DEBUG [main] (JmsTemplate.java:492) - Executing callback on JMS Session: JmsPoolSession { org.apache.qpid.jms.JmsSession@7fd26ad8 }
DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProducerBuilder.java:68) - Creating AmqpFixedProducer for: null
DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade
启用TRACE级别日志后,注意到下面的msg
DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade { org.apache.qpid.jms.provider.amqp.message.AmqpJmsObjectMessageFacade@36cc9385 }
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpFixedProducer.java:100) - Holding Message send until credit is available.
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
- 下面是我用来从 java class. 发送消息的上下文 xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/util ttp://www.springframework.org/schema/util/spring-util.xsd">
<bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
<property name="remoteURI" value="amqp://localhost:5672?amqp.traceFrames=true"/>
</bean>
<bean id="jpcf" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop" >
<property name="maxConnections" value="3" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jpcf" />
<property name="concurrentConsumers" value="3" />
</bean>
<bean id="amqpcomponent" class="org.apache.camel.component.amqp.AMQPComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<!-- Camel Spring Remoting Interface -->
<camel:proxy id="proxyObject" binding="false" serviceUrl="direct:proxy-msg-handler" serviceInterface="com.myexample.MessageHandler"/>
<!-- Bean that initialize the Spring Remoting for handling message -->
<bean id="BeanProxy" class="com.myexample.MessageProducer">
<property name="messageHandler" ref="proxyObject"/>
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring" autoStartup="true" trace="true">
<camel:route autoStartup="true" id="processMessage">
<camel:from uri="direct:proxy-msg-handler"/>
<camel:log message="Log incoming message" logName="Incoming" loggingLevel="DEBUG"/>
<camel:inOnly uri="amqpcomponent:queue:message.queue"/>
</camel:route>
</camelContext>
</beans>
- java class 运行 上下文,用于调用远程 spring bean 方法。
- 使用下面的 java class 将消息推送到 Artemis 队列
package com.myexample;
public class LoadMessageSupport {
public static void main(String ...strings) {
ApplicationContext appContext =null;
try {
appContext = new ClassPathXmlApplicationContext("file:/paht/to/context/message-handler-context.xml");
MessageProducer messageProducer = appContext.getBean(MessageProducer.class);
message = "{ \"itemDesc\" : \"test description\" }" ;
System.out.println(message);
messageProducer.sendMessage(MessageType.ITEM_DESC, message);
// enum messagetype already defined within project
//System.exit(0);
}catch(Exception exe) {
System.out.println("Something wrong... ");
exe.printStackTrace();
}finally {
if(camelContext!=null) {
System.out.println("camel context stopped...");
camelContext.stop();
}
}
}
}
- 消息接收者class
@InOnly
public interface MessageHandler{
public processMessage(MessageType type, Order order);
public processMessage(MessageType type, String message); // trying to invoke this message
}
- 制作人class
public class MessageProducer{
// using the proxy object within the producer object
// this will invoke the spring bean using remote (rmi)
private MessageHandler messageHandler;
protected MessageHandler getMessageHandler() {
return this.messageHandler;
}
public void setMessageHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
//constructor
public MessageProducer() {}
public void sendMessage(MessageType type, Order order ){
getMessageHandler().processMessage(type,order);
}
public void sendMessage(MessageType type, String message ){
getMessageHandler().processMessage(type,message);
}
- 消息接收者
public class MessageReceiver implements MessageHandler {
@Handler
public void processMessage(MessageType type, Order order){
System.out.println(" received type and ORDER info ...");
// invoke methods for logical processing
}
@Handler
public void processMessage(MessageType type, String message){
System.out.println(" received type and MESSAGE info for procesing...");
// invoke methods for logical processing
}
}
我早些时候尝试时,似乎我的 VM 没有足够的内存。
free -h
表示只剩下500MB。
重启 VM 后,消息现在发送到 Artemis borker。