JmsTransactionManager 影响其他消息?

JmsTransactionManager affecting other messages?

我正在尝试让客户端确认模式在 JMS 中工作。我目前有一个队列在自动确认中工作,我不知道还需要做什么才能让它被客户端确认。

执行时,我没有收到任何错误。它似乎只是在处理一些其他消息(日志中的不同 id)。

applicationContext.xml

<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg index="0" value="tcp://localhost:61616" />
    </bean>

<bean id="connectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg ref="amqConnectionFactory" />
</bean>

<bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!-- name of the queue -->
    <constructor-arg index="0" value="MyQueue" />
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="defaultDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
</bean>

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory">
        <ref bean="amqConnectionFactory"/>
    </property>
</bean>

QueueJMSProxy.java

private JmsTemplate jmsTemplate;

@Autowired
private JmsTransactionManager jmsTransactionManager;

@Autowired
public QueueJMSProxy(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
    jmsTemplate.setReceiveTimeout(BAGSchedulerCTE.RECEIVE_JMS_TIMEOUT);
}

public synchronized MyMessage receiveMessageFromBSG() {

    //1
    TransactionStatus status = jmsTransactionManager.getTransaction(new DefaultTransactionDefinition());

    //2
    Message receivedMessage = jmsTemplate.receive();
    MyMessage myMessage = saveInDB(receivedMessage);


    //3
    jmsTransactionManager.rollback(status);

    return myMessage;
}

1、2 和 3 的日志

//1
2017-01-27_11:32:36,209 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat  | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,219 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.InactivityMonitor:92->configuredOk  | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,224 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:118->negociate  | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:125->negociate  | tcp://localhost/127.0.0.1:61616@60606 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:140->negociate  | tcp://localhost/127.0.0.1:61616@60606 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}

//2
2017-01-27_11:32:40,851 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat  | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,852  INFO main | connection.CachingConnectionFactory:311->initConnection  | Established shared JMS Connection: ActiveMQConnection {id=ID:LATES-0008-60605-1485513156084-1:2,clientId=null,started=false}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.InactivityMonitor:92->configuredOk  | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:118->negociate  | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:125->negociate  | tcp://localhost/127.0.0.1:61616@60607 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:40,857 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:140->negociate  | tcp://localhost/127.0.0.1:61616@60607 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
2017-01-27_11:32:40,923 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | thread.TaskRunnerFactory:91->init  | Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@799e037[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:40,929 DEBUG main | activemq.TransactionContext:248->begin  | Begin:TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,940 DEBUG main | activemq.ActiveMQSession:572->commit  | ID:LATES-0008-60605-1485513156084-1:2:1 Transaction Commit :TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,941 DEBUG main | activemq.TransactionContext:317->commit  | Commit: TX:ID:LATES-0008-60605-1485513156084-1:2:1 syncCount: 1

//3
2017-01-27_11:32:47,026 DEBUG main | activemq.ActiveMQSession:593->rollback  | ID:LATES-0008-60605-1485513156084-1:1:1 Transaction Rollback, txid:null
2017-01-27_11:32:47,069 DEBUG main | util.ThreadPoolUtils:136->doShutdown  | Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@2a6a9ba5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.001 seconds.
2017-01-27_11:32:47,071 DEBUG main | tcp.TcpTransport:525->doStop  | Stopping transport tcp://localhost/127.0.0.1:61616@60606
2017-01-27_11:32:47,074 DEBUG main | thread.TaskRunnerFactory:91->init  | Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:47,076 DEBUG ActiveMQ Task-1 | tcp.TcpTransport:543->run  | Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=60606]
2017-01-27_11:32:47,076 DEBUG main | util.ThreadPoolUtils:54->shutdownNow  | Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]

我不明白这个问题,你能post你的日志吗,最好像这样设置这些属性:

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="defaultDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
</bean>

更新

The use of CachingConnectionFactory as a target for this transaction manager is strongly recommended. CachingConnectionFactory uses a single JMS Connection for all JMS access in order to avoid the overhead of repeated Connection creation, as well as maintaining a cache of Sessions. Each transaction will then share the same JMS Connection, while still using its own individual JMS Session.

http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/connection/JmsTransactionManager.html

JmsTransactionManager.connectionFactory 像这样

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory">
        <ref bean="connectionFactory"/>
    </property>
</bean>