Bitronix - JMS 和 JDBC - 消息因异常而出队
Bitronix - JMS and JDBC - Message is dequeued on Exception
我正在尝试将 Bitronix 事务管理器集成到我的 Spring 引导项目中以一起管理 jdbc 和 jms 事务。我有两个数据库和一个用于 jms 的 ActiveMQ 代理。我已经在同一个事务中连接了数据库,但是当我尝试包含 JMS 时,它似乎不起作用。
这是我的 Bitronix 事务管理器配置:
@Configuration
@EnableTransactionManagement
public class BitronixJtaConfiguration {
private static final Logger log = LoggerFactory.getLogger(BitronixJtaConfiguration.class);
@Value("${bitronix.tm.serverId}")
private String serverId;
@Value("${bitronix.tm.journal.disk.logPart1Filename:}")
private String logPart1Filename;
@Value("${bitronix.tm.journal.disk.logPart2Filename:}")
private String logPart2Filename;
@Bean
public bitronix.tm.Configuration transactionManagerServices() {
bitronix.tm.Configuration configuration = TransactionManagerServices.getConfiguration();
configuration.setServerId(serverId);
if ("".equals(logPart1Filename) && "".equals(logPart2Filename)) {
configuration.setJournal(null);
log.info("Disable journal for testing.");
} else {
configuration.setLogPart1Filename(logPart1Filename);
configuration.setLogPart2Filename(logPart2Filename);
}
return configuration;
}
@Bean
public TransactionManager transactionManager() {
return TransactionManagerServices.getTransactionManager();
}
@Bean
public UserTransaction userTransaction() {
return TransactionManagerServices.getTransactionManager();
}
@Bean
public PlatformTransactionManager platformTransactionManager() {
UserTransaction userTransaction = userTransaction();
TransactionManager transactionManager = transactionManager();
return new JtaTransactionManager(userTransaction, transactionManager);
}
}
这是我的数据库配置之一class:
@Configuration
@EnableTransactionManagement
public class TransportationPlanDBConfig {
private static final Logger LOGGER = LoggerFactory.getLogger("ppalfile");
@Value("${tp.jdbc.driverClassName}")
private String driverClassName;
@Value("${tp.jdbc.username}")
private String username;
@Value("${tp.jdbc.url}")
private String url;
@Value("${tp.jdbc.password}")
private String password;
@Value("${tp.c3p0.max_size}")
private int c3p0MaxSize;
@Value("${tp.c3p0.min_size}")
private int c3p0MinSize;
@Value("${tp.c3p0.unreturned_connection_timeout}")
private int c3p0UnreturnedConnectionTimeout;
@Value("${tp.c3p0.acquire_increment}")
private int c3p0AcquireIncrement;
@Value("${tp.c3p0.max_idle_time}")
private int c3p0MaxIdleTime;
public TransportationPlanDBConfig() {
// Empty constructor
}
@Bean(name = "tpds", destroyMethod = "close")
public DataSource dataSource() {
LOGGER.debug("Creating Transportation plan DS");
PoolingDataSource poolingDataSource = new PoolingDataSource();
poolingDataSource.setClassName(driverClassName);
poolingDataSource.setUniqueName("tpds");
Properties props = new Properties();
props.put("url", url);
props.put("user", username);
props.put("password", password);
poolingDataSource.setDriverProperties(props);
poolingDataSource.setAllowLocalTransactions(true);
poolingDataSource.setMaxPoolSize(c3p0MaxSize);
poolingDataSource.init();
return poolingDataSource;
}
@Bean(name = "tpJdbcTemplate")
JdbcTemplate jdbcTemplate(@Qualifier("tpds") DataSource dataSource) {
LOGGER.debug("Creating JdbcTemplate transport plan");
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
LOGGER.debug(" JdbcTemplate Transport Plan created ");
return jdbcTemplate;
}
}
我的 ActiveMQ 配置class:
@Configuration
@EnableTransactionManagement
public class ActivesMQsConfiguration {
@Bean
public ConnectionFactory jmsConnectionFactoryLocal() {
PoolingConnectionFactory btmPoolingConnectionFactory = new PoolingConnectionFactory();
btmPoolingConnectionFactory.setClassName("org.apache.activemq.ActiveMQXAConnectionFactory");
btmPoolingConnectionFactory.setUniqueName("AMQLocal");
btmPoolingConnectionFactory.setMinPoolSize(1);
btmPoolingConnectionFactory.setMaxPoolSize(5);
btmPoolingConnectionFactory.setAllowLocalTransactions(true);
btmPoolingConnectionFactory.setUser("admin");
btmPoolingConnectionFactory.setPassword("admin");
btmPoolingConnectionFactory.getDriverProperties().setProperty("brokerURL", "tcp://localhost:61616");
btmPoolingConnectionFactory.init();
return btmPoolingConnectionFactory;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactoryLocal(
@Qualifier("jmsConnectionFactoryLocal") ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(true);
configurer.configure(factory, connectionFactory);
return factory;
}
}
我的 JMS 侦听器实现:
@Component
@Transactional
public class ContactTransactionReceiver {
private int mensajesConsumer2 = 0;
@Autowired
@Qualifier("versionJdbcTemplate")
private JdbcTemplate versionJdbcTemplate;
@Autowired
@Qualifier("tpJdbcTemplate")
private JdbcTemplate tpjdbcTemplate;
@Autowired
private VersionsConfDao versionsConfDao;
@Autowired
private TrainDao trainDao;
@Transactional(rollbackFor=Exception.class)
@JmsListener(destination = "Consumer.consumer2.VirtualTopic.TopicPrueba")
public void receiveMessageFromContacts2(Message message) throws Exception {
mensajesConsumer2++;
TextMessage txtMessage = (TextMessage) message;
System.out.println("Segundo consumer:" + txtMessage.getText() + " recibidos:" + mensajesConsumer2);
VersionsConf versionsconf = new VersionsConf("V" + mensajesConsumer2, "V" + mensajesConsumer2, false,new Timestamp(1L), 1);
VersionsConf versionsResult = versionsConfDao.insertUpdate(versionJdbcTemplate, versionsconf);
if (mensajesConsumer2 == 2) {
throw new Exception();
}
Train train = new Train("101"+mensajesConsumer2, 1L, 2L, false, true, "atp");
Train trainResult = trainDao.insertUpdate(tpjdbcTemplate, train);
if (mensajesConsumer2 == 3) {
throw new Exception();
}
}
}
根据我对 Bitronix 功能的理解,我的监听器实现:
对于第一条传入消息:必须在每个数据库中插入一行并使消息出列。 -> 这很好用。
关于第二条和第三条传入消息:由于异常必须插入 0 行并将消息保留在队列中。 -> 未插入任何行,但消息已出队。
此外,我想补充一点,它在执行过程中记录了以下内容:
[main] bitronix.tm.recovery.Recoverer:恢复在 4 个资源 [AMQLocal、vds、AMQRemote、tpds]
上提交了 0 个悬空事务并回滚了 0 个中止事务
所以,我了解到两个经纪人和两个数据库都已注册。但是当侦听器处理第二条消息时(它抛出异常),它记录:
WARN 5740 [Session Task-1] bitronix.tm.twopc.Preparer:使用 0 个登记资源执行事务
对这个问题有什么想法吗??
您可以在以下位置找到完整代码:https://github.com/PedroRamirezTOR/spring-jms-jdbc-integration.git
谢谢!
首先,recovery committed 0 dangling transaction(s) and rolled back 0 aborted transaction(s) on 4 resource(s)
消息会不时出现,这是完全正常的。只要提交和回滚计数器为零,您就可以忽略它。
executing transaction with 0 enlisted resource
日志看起来像真的。
我高度怀疑您的 Spring 设置有问题。无论如何我都不是 Spring 专家,但是 DefaultJmsListenerContainerFactory
应该引用您的 Spring PlatformTransactionManager
实例,以便它知道它必须以事务方式工作,所以你应该打电话给 factory.setTransactionManager(PlatformTransactionManager)
.
这至少应该让您进入下一步。
我正在尝试将 Bitronix 事务管理器集成到我的 Spring 引导项目中以一起管理 jdbc 和 jms 事务。我有两个数据库和一个用于 jms 的 ActiveMQ 代理。我已经在同一个事务中连接了数据库,但是当我尝试包含 JMS 时,它似乎不起作用。
这是我的 Bitronix 事务管理器配置:
@Configuration
@EnableTransactionManagement
public class BitronixJtaConfiguration {
private static final Logger log = LoggerFactory.getLogger(BitronixJtaConfiguration.class);
@Value("${bitronix.tm.serverId}")
private String serverId;
@Value("${bitronix.tm.journal.disk.logPart1Filename:}")
private String logPart1Filename;
@Value("${bitronix.tm.journal.disk.logPart2Filename:}")
private String logPart2Filename;
@Bean
public bitronix.tm.Configuration transactionManagerServices() {
bitronix.tm.Configuration configuration = TransactionManagerServices.getConfiguration();
configuration.setServerId(serverId);
if ("".equals(logPart1Filename) && "".equals(logPart2Filename)) {
configuration.setJournal(null);
log.info("Disable journal for testing.");
} else {
configuration.setLogPart1Filename(logPart1Filename);
configuration.setLogPart2Filename(logPart2Filename);
}
return configuration;
}
@Bean
public TransactionManager transactionManager() {
return TransactionManagerServices.getTransactionManager();
}
@Bean
public UserTransaction userTransaction() {
return TransactionManagerServices.getTransactionManager();
}
@Bean
public PlatformTransactionManager platformTransactionManager() {
UserTransaction userTransaction = userTransaction();
TransactionManager transactionManager = transactionManager();
return new JtaTransactionManager(userTransaction, transactionManager);
}
}
这是我的数据库配置之一class:
@Configuration
@EnableTransactionManagement
public class TransportationPlanDBConfig {
private static final Logger LOGGER = LoggerFactory.getLogger("ppalfile");
@Value("${tp.jdbc.driverClassName}")
private String driverClassName;
@Value("${tp.jdbc.username}")
private String username;
@Value("${tp.jdbc.url}")
private String url;
@Value("${tp.jdbc.password}")
private String password;
@Value("${tp.c3p0.max_size}")
private int c3p0MaxSize;
@Value("${tp.c3p0.min_size}")
private int c3p0MinSize;
@Value("${tp.c3p0.unreturned_connection_timeout}")
private int c3p0UnreturnedConnectionTimeout;
@Value("${tp.c3p0.acquire_increment}")
private int c3p0AcquireIncrement;
@Value("${tp.c3p0.max_idle_time}")
private int c3p0MaxIdleTime;
public TransportationPlanDBConfig() {
// Empty constructor
}
@Bean(name = "tpds", destroyMethod = "close")
public DataSource dataSource() {
LOGGER.debug("Creating Transportation plan DS");
PoolingDataSource poolingDataSource = new PoolingDataSource();
poolingDataSource.setClassName(driverClassName);
poolingDataSource.setUniqueName("tpds");
Properties props = new Properties();
props.put("url", url);
props.put("user", username);
props.put("password", password);
poolingDataSource.setDriverProperties(props);
poolingDataSource.setAllowLocalTransactions(true);
poolingDataSource.setMaxPoolSize(c3p0MaxSize);
poolingDataSource.init();
return poolingDataSource;
}
@Bean(name = "tpJdbcTemplate")
JdbcTemplate jdbcTemplate(@Qualifier("tpds") DataSource dataSource) {
LOGGER.debug("Creating JdbcTemplate transport plan");
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
LOGGER.debug(" JdbcTemplate Transport Plan created ");
return jdbcTemplate;
}
}
我的 ActiveMQ 配置class:
@Configuration
@EnableTransactionManagement
public class ActivesMQsConfiguration {
@Bean
public ConnectionFactory jmsConnectionFactoryLocal() {
PoolingConnectionFactory btmPoolingConnectionFactory = new PoolingConnectionFactory();
btmPoolingConnectionFactory.setClassName("org.apache.activemq.ActiveMQXAConnectionFactory");
btmPoolingConnectionFactory.setUniqueName("AMQLocal");
btmPoolingConnectionFactory.setMinPoolSize(1);
btmPoolingConnectionFactory.setMaxPoolSize(5);
btmPoolingConnectionFactory.setAllowLocalTransactions(true);
btmPoolingConnectionFactory.setUser("admin");
btmPoolingConnectionFactory.setPassword("admin");
btmPoolingConnectionFactory.getDriverProperties().setProperty("brokerURL", "tcp://localhost:61616");
btmPoolingConnectionFactory.init();
return btmPoolingConnectionFactory;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactoryLocal(
@Qualifier("jmsConnectionFactoryLocal") ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(true);
configurer.configure(factory, connectionFactory);
return factory;
}
}
我的 JMS 侦听器实现:
@Component
@Transactional
public class ContactTransactionReceiver {
private int mensajesConsumer2 = 0;
@Autowired
@Qualifier("versionJdbcTemplate")
private JdbcTemplate versionJdbcTemplate;
@Autowired
@Qualifier("tpJdbcTemplate")
private JdbcTemplate tpjdbcTemplate;
@Autowired
private VersionsConfDao versionsConfDao;
@Autowired
private TrainDao trainDao;
@Transactional(rollbackFor=Exception.class)
@JmsListener(destination = "Consumer.consumer2.VirtualTopic.TopicPrueba")
public void receiveMessageFromContacts2(Message message) throws Exception {
mensajesConsumer2++;
TextMessage txtMessage = (TextMessage) message;
System.out.println("Segundo consumer:" + txtMessage.getText() + " recibidos:" + mensajesConsumer2);
VersionsConf versionsconf = new VersionsConf("V" + mensajesConsumer2, "V" + mensajesConsumer2, false,new Timestamp(1L), 1);
VersionsConf versionsResult = versionsConfDao.insertUpdate(versionJdbcTemplate, versionsconf);
if (mensajesConsumer2 == 2) {
throw new Exception();
}
Train train = new Train("101"+mensajesConsumer2, 1L, 2L, false, true, "atp");
Train trainResult = trainDao.insertUpdate(tpjdbcTemplate, train);
if (mensajesConsumer2 == 3) {
throw new Exception();
}
}
}
根据我对 Bitronix 功能的理解,我的监听器实现:
对于第一条传入消息:必须在每个数据库中插入一行并使消息出列。 -> 这很好用。
关于第二条和第三条传入消息:由于异常必须插入 0 行并将消息保留在队列中。 -> 未插入任何行,但消息已出队。
此外,我想补充一点,它在执行过程中记录了以下内容: [main] bitronix.tm.recovery.Recoverer:恢复在 4 个资源 [AMQLocal、vds、AMQRemote、tpds]
上提交了 0 个悬空事务并回滚了 0 个中止事务所以,我了解到两个经纪人和两个数据库都已注册。但是当侦听器处理第二条消息时(它抛出异常),它记录:
WARN 5740 [Session Task-1] bitronix.tm.twopc.Preparer:使用 0 个登记资源执行事务
对这个问题有什么想法吗??
您可以在以下位置找到完整代码:https://github.com/PedroRamirezTOR/spring-jms-jdbc-integration.git
谢谢!
首先,recovery committed 0 dangling transaction(s) and rolled back 0 aborted transaction(s) on 4 resource(s)
消息会不时出现,这是完全正常的。只要提交和回滚计数器为零,您就可以忽略它。
executing transaction with 0 enlisted resource
日志看起来像真的。
我高度怀疑您的 Spring 设置有问题。无论如何我都不是 Spring 专家,但是 DefaultJmsListenerContainerFactory
应该引用您的 Spring PlatformTransactionManager
实例,以便它知道它必须以事务方式工作,所以你应该打电话给 factory.setTransactionManager(PlatformTransactionManager)
.
这至少应该让您进入下一步。