JmsTransactionManager 影响其他消息?
JmsTransactionManager affecting other messages?
我正在尝试让客户端确认模式在 JMS 中工作。我目前有一个队列在自动确认中工作,我不知道还需要做什么才能让它被客户端确认。
执行时,我没有收到任何错误。它似乎只是在处理一些其他消息(日志中的不同 id)。
applicationContext.xml
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="tcp://localhost:61616" />
</bean>
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
</bean>
<bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- name of the queue -->
<constructor-arg index="0" value="MyQueue" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="defaultDestination" />
<property name="sessionTransacted" value="true" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory">
<ref bean="amqConnectionFactory"/>
</property>
</bean>
QueueJMSProxy.java
private JmsTemplate jmsTemplate;
@Autowired
private JmsTransactionManager jmsTransactionManager;
@Autowired
public QueueJMSProxy(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
jmsTemplate.setReceiveTimeout(BAGSchedulerCTE.RECEIVE_JMS_TIMEOUT);
}
public synchronized MyMessage receiveMessageFromBSG() {
//1
TransactionStatus status = jmsTransactionManager.getTransaction(new DefaultTransactionDefinition());
//2
Message receivedMessage = jmsTemplate.receive();
MyMessage myMessage = saveInDB(receivedMessage);
//3
jmsTransactionManager.rollback(status);
return myMessage;
}
1、2 和 3 的日志
//1
2017-01-27_11:32:36,209 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,219 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.InactivityMonitor:92->configuredOk | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,224 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:118->negociate | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:125->negociate | tcp://localhost/127.0.0.1:61616@60606 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:140->negociate | tcp://localhost/127.0.0.1:61616@60606 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
//2
2017-01-27_11:32:40,851 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,852 INFO main | connection.CachingConnectionFactory:311->initConnection | Established shared JMS Connection: ActiveMQConnection {id=ID:LATES-0008-60605-1485513156084-1:2,clientId=null,started=false}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.InactivityMonitor:92->configuredOk | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:118->negociate | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:125->negociate | tcp://localhost/127.0.0.1:61616@60607 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:40,857 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:140->negociate | tcp://localhost/127.0.0.1:61616@60607 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
2017-01-27_11:32:40,923 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | thread.TaskRunnerFactory:91->init | Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@799e037[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:40,929 DEBUG main | activemq.TransactionContext:248->begin | Begin:TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,940 DEBUG main | activemq.ActiveMQSession:572->commit | ID:LATES-0008-60605-1485513156084-1:2:1 Transaction Commit :TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,941 DEBUG main | activemq.TransactionContext:317->commit | Commit: TX:ID:LATES-0008-60605-1485513156084-1:2:1 syncCount: 1
//3
2017-01-27_11:32:47,026 DEBUG main | activemq.ActiveMQSession:593->rollback | ID:LATES-0008-60605-1485513156084-1:1:1 Transaction Rollback, txid:null
2017-01-27_11:32:47,069 DEBUG main | util.ThreadPoolUtils:136->doShutdown | Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@2a6a9ba5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.001 seconds.
2017-01-27_11:32:47,071 DEBUG main | tcp.TcpTransport:525->doStop | Stopping transport tcp://localhost/127.0.0.1:61616@60606
2017-01-27_11:32:47,074 DEBUG main | thread.TaskRunnerFactory:91->init | Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:47,076 DEBUG ActiveMQ Task-1 | tcp.TcpTransport:543->run | Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=60606]
2017-01-27_11:32:47,076 DEBUG main | util.ThreadPoolUtils:54->shutdownNow | Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
我不明白这个问题,你能post你的日志吗,最好像这样设置这些属性:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="defaultDestination" />
<property name="sessionTransacted" value="true" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
</bean>
更新
The use of CachingConnectionFactory as a target for this transaction
manager is strongly recommended. CachingConnectionFactory uses a
single JMS Connection for all JMS access in order to avoid the
overhead of repeated Connection creation, as well as maintaining a
cache of Sessions. Each transaction will then share the same JMS
Connection, while still using its own individual JMS Session.
JmsTransactionManager.connectionFactory
像这样
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory">
<ref bean="connectionFactory"/>
</property>
</bean>
我正在尝试让客户端确认模式在 JMS 中工作。我目前有一个队列在自动确认中工作,我不知道还需要做什么才能让它被客户端确认。
执行时,我没有收到任何错误。它似乎只是在处理一些其他消息(日志中的不同 id)。
applicationContext.xml
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="tcp://localhost:61616" />
</bean>
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
</bean>
<bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- name of the queue -->
<constructor-arg index="0" value="MyQueue" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="defaultDestination" />
<property name="sessionTransacted" value="true" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory">
<ref bean="amqConnectionFactory"/>
</property>
</bean>
QueueJMSProxy.java
private JmsTemplate jmsTemplate;
@Autowired
private JmsTransactionManager jmsTransactionManager;
@Autowired
public QueueJMSProxy(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
jmsTemplate.setReceiveTimeout(BAGSchedulerCTE.RECEIVE_JMS_TIMEOUT);
}
public synchronized MyMessage receiveMessageFromBSG() {
//1
TransactionStatus status = jmsTransactionManager.getTransaction(new DefaultTransactionDefinition());
//2
Message receivedMessage = jmsTemplate.receive();
MyMessage myMessage = saveInDB(receivedMessage);
//3
jmsTransactionManager.rollback(status);
return myMessage;
}
1、2 和 3 的日志
//1
2017-01-27_11:32:36,209 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,219 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.InactivityMonitor:92->configuredOk | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,224 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:118->negociate | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:125->negociate | tcp://localhost/127.0.0.1:61616@60606 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:140->negociate | tcp://localhost/127.0.0.1:61616@60606 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
//2
2017-01-27_11:32:40,851 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,852 INFO main | connection.CachingConnectionFactory:311->initConnection | Established shared JMS Connection: ActiveMQConnection {id=ID:LATES-0008-60605-1485513156084-1:2,clientId=null,started=false}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.InactivityMonitor:92->configuredOk | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:118->negociate | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:125->negociate | tcp://localhost/127.0.0.1:61616@60607 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:40,857 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:140->negociate | tcp://localhost/127.0.0.1:61616@60607 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
2017-01-27_11:32:40,923 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | thread.TaskRunnerFactory:91->init | Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@799e037[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:40,929 DEBUG main | activemq.TransactionContext:248->begin | Begin:TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,940 DEBUG main | activemq.ActiveMQSession:572->commit | ID:LATES-0008-60605-1485513156084-1:2:1 Transaction Commit :TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,941 DEBUG main | activemq.TransactionContext:317->commit | Commit: TX:ID:LATES-0008-60605-1485513156084-1:2:1 syncCount: 1
//3
2017-01-27_11:32:47,026 DEBUG main | activemq.ActiveMQSession:593->rollback | ID:LATES-0008-60605-1485513156084-1:1:1 Transaction Rollback, txid:null
2017-01-27_11:32:47,069 DEBUG main | util.ThreadPoolUtils:136->doShutdown | Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@2a6a9ba5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.001 seconds.
2017-01-27_11:32:47,071 DEBUG main | tcp.TcpTransport:525->doStop | Stopping transport tcp://localhost/127.0.0.1:61616@60606
2017-01-27_11:32:47,074 DEBUG main | thread.TaskRunnerFactory:91->init | Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:47,076 DEBUG ActiveMQ Task-1 | tcp.TcpTransport:543->run | Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=60606]
2017-01-27_11:32:47,076 DEBUG main | util.ThreadPoolUtils:54->shutdownNow | Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
我不明白这个问题,你能post你的日志吗,最好像这样设置这些属性:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="defaultDestination" /> <property name="sessionTransacted" value="true" /> <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" /> </bean>
更新
The use of CachingConnectionFactory as a target for this transaction manager is strongly recommended. CachingConnectionFactory uses a single JMS Connection for all JMS access in order to avoid the overhead of repeated Connection creation, as well as maintaining a cache of Sessions. Each transaction will then share the same JMS Connection, while still using its own individual JMS Session.
JmsTransactionManager.connectionFactory
像这样
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory"> <ref bean="connectionFactory"/> </property> </bean>