Jms 模板未在事务中接收消息
Jms Template not receiving messages in transaction
配置class
package jms;
import java.sql.SQLException;
import javax.jms.ConnectionFactory;
import javax.sql.DataSource;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.hsqldb.jdbc.JDBCDriver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@org.springframework.context.annotation.Configuration
@ComponentScan("jms")
@EnableTransactionManagement
public class Configuration {
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL( "tcp://localhost:61616");
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory());
return jmsTemplate;
}
@Bean
public JmsTransactionManager transactionManager() {
JmsTransactionManager p = new JmsTransactionManager(connectionFactory());
return p;
}
}
接收者Class.
导入org.springframework.transaction.annotation.Transactional;
@Component
@Transactional
public class ReceiverClass {
@Autowired
JmsTemplate jmsTemplate;
@Transactional
void func() {
while (true) {
Message message = jmsTemplate.receive("tempQueue.queue");
System.out.println(message.toString());
throw new RuntimeException();
}
}
}
主要Class
package jms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class Main {
public static void main(String[] args) {
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(Configuration.class);
ReceiverClass r = (ReceiverClass) applicationContext.getBean("receiverClass");
r.func();
}
}
Pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>spring</groupId>
<artifactId>spring</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.15.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
我想要的是使用 jmsTemplate 从队列中检索消息并进行一些处理,如果出现问题,检索到的消息将存储回队列中。但是我无法使用所提到的配置来实现它,并且在读取时甚至抛出一些异常时消息也会从队列中删除。
我只是 运行 一个测试,对我来说效果很好...
@SpringBootApplication
public class So48774170Application {
public static void main(String[] args) {
SpringApplication.run(So48774170Application.class, args).close();
}
@Configuration
@EnableTransactionManagement
public static class Config {
@Bean
public ApplicationRunner runner(JmsTemplate template, Foo foo) {
return args -> {
template.convertAndSend("foo", "bar");
try {
foo.test();
}
catch (RuntimeException e) {
// no op
}
System.out.println("OK:" + foo.test());
};
}
@Bean
@Primary
public ConnectionFactory cf() {
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
}
@Bean
public CachingConnectionFactory ccf(ConnectionFactory cf) {
return new CachingConnectionFactory(cf);
}
@Bean
public JmsTemplate template(CachingConnectionFactory ccf) {
return new JmsTemplate(ccf);
}
@Bean
public PlatformTransactionManager transactionManager(CachingConnectionFactory ccf) {
return new JmsTransactionManager(ccf);
}
}
}
和
@Component
public class Foo {
@Autowired
private JmsTemplate template;
private int count;
@Transactional
public Message test() {
this.template.setReceiveTimeout(5_000);
Message received = template.receive("foo");
System.out.println(received);
if (this.count++ == 0) {
throw new RuntimeException();
}
return received;
}
}
和
ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-59919-1518553219166-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-59919-1518553219166-4:1:1:1, destination = queue://foo, transactionId = null, expiration = 0, timestamp = 1518553219346, arrival = 0, brokerInTime = 1518553219346, brokerOutTime = 1518553219359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1030, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = bar}
ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-59919-1518553219166-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-59919-1518553219166-4:1:1:1, destination = queue://foo, transactionId = null, expiration = 0, timestamp = 1518553219346, arrival = 0, brokerInTime = 1518553219346, brokerOutTime = 1518553219359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 1030, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = bar}
OK:ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-59919-1518553219166-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-59919-1518553219166-4:1:1:1, destination = queue://foo, transactionId = null, expiration = 0, timestamp = 1518553219346, arrival = 0, brokerInTime = 1518553219346, brokerOutTime = 1518553219359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 1030, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = bar}
我建议你 运行 在调试器中;在 func()
中放置一个断点并验证调用堆栈上有一个 TransactionInterceptor
,向下几个堆栈帧。如果没有,则表示 @EnableTransactionManagement
代理机制由于某种原因无法正常工作。
同时尝试打开 DEBUG 日志记录,看看它是否提供任何线索。
请注意,建议您将 CachingConnectionFactory
与模板一起使用,以避免为每个操作打开一个新连接。
配置class
package jms;
import java.sql.SQLException;
import javax.jms.ConnectionFactory;
import javax.sql.DataSource;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.hsqldb.jdbc.JDBCDriver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@org.springframework.context.annotation.Configuration
@ComponentScan("jms")
@EnableTransactionManagement
public class Configuration {
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL( "tcp://localhost:61616");
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory());
return jmsTemplate;
}
@Bean
public JmsTransactionManager transactionManager() {
JmsTransactionManager p = new JmsTransactionManager(connectionFactory());
return p;
}
}
接收者Class.
导入org.springframework.transaction.annotation.Transactional;
@Component
@Transactional
public class ReceiverClass {
@Autowired
JmsTemplate jmsTemplate;
@Transactional
void func() {
while (true) {
Message message = jmsTemplate.receive("tempQueue.queue");
System.out.println(message.toString());
throw new RuntimeException();
}
}
}
主要Class
package jms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class Main {
public static void main(String[] args) {
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(Configuration.class);
ReceiverClass r = (ReceiverClass) applicationContext.getBean("receiverClass");
r.func();
}
}
Pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>spring</groupId>
<artifactId>spring</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.15.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
我想要的是使用 jmsTemplate 从队列中检索消息并进行一些处理,如果出现问题,检索到的消息将存储回队列中。但是我无法使用所提到的配置来实现它,并且在读取时甚至抛出一些异常时消息也会从队列中删除。
我只是 运行 一个测试,对我来说效果很好...
@SpringBootApplication
public class So48774170Application {
public static void main(String[] args) {
SpringApplication.run(So48774170Application.class, args).close();
}
@Configuration
@EnableTransactionManagement
public static class Config {
@Bean
public ApplicationRunner runner(JmsTemplate template, Foo foo) {
return args -> {
template.convertAndSend("foo", "bar");
try {
foo.test();
}
catch (RuntimeException e) {
// no op
}
System.out.println("OK:" + foo.test());
};
}
@Bean
@Primary
public ConnectionFactory cf() {
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
}
@Bean
public CachingConnectionFactory ccf(ConnectionFactory cf) {
return new CachingConnectionFactory(cf);
}
@Bean
public JmsTemplate template(CachingConnectionFactory ccf) {
return new JmsTemplate(ccf);
}
@Bean
public PlatformTransactionManager transactionManager(CachingConnectionFactory ccf) {
return new JmsTransactionManager(ccf);
}
}
}
和
@Component
public class Foo {
@Autowired
private JmsTemplate template;
private int count;
@Transactional
public Message test() {
this.template.setReceiveTimeout(5_000);
Message received = template.receive("foo");
System.out.println(received);
if (this.count++ == 0) {
throw new RuntimeException();
}
return received;
}
}
和
ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-59919-1518553219166-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-59919-1518553219166-4:1:1:1, destination = queue://foo, transactionId = null, expiration = 0, timestamp = 1518553219346, arrival = 0, brokerInTime = 1518553219346, brokerOutTime = 1518553219359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1030, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = bar}
ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-59919-1518553219166-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-59919-1518553219166-4:1:1:1, destination = queue://foo, transactionId = null, expiration = 0, timestamp = 1518553219346, arrival = 0, brokerInTime = 1518553219346, brokerOutTime = 1518553219359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 1030, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = bar}
OK:ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-59919-1518553219166-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-59919-1518553219166-4:1:1:1, destination = queue://foo, transactionId = null, expiration = 0, timestamp = 1518553219346, arrival = 0, brokerInTime = 1518553219346, brokerOutTime = 1518553219359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 1030, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = bar}
我建议你 运行 在调试器中;在 func()
中放置一个断点并验证调用堆栈上有一个 TransactionInterceptor
,向下几个堆栈帧。如果没有,则表示 @EnableTransactionManagement
代理机制由于某种原因无法正常工作。
同时尝试打开 DEBUG 日志记录,看看它是否提供任何线索。
请注意,建议您将 CachingConnectionFactory
与模板一起使用,以避免为每个操作打开一个新连接。