RabbitMqTemplate 在分布式事务中与数据库的接收操作
RabbitMqTemplate Receive Operation in Distributed Transaction with Database
我需要创建一个简单的应用程序,
- 从 RabbitMq 队列中读取第一条可用消息
- 将对象保存到数据库中Table
应用程序是 MVC 类型,并有一个公开的 http 端点来触发用户的操作。
调用中的所有操作都需要参与分布式事务,因此在一个工作单元下。
代码 class 是这样的:-
队列操作class:
public class ReadFromQueue {
pubic String readMessage(queue name) {
try {
rabbitTemplate.receiveAndConvert(queue);
} catch (AmqpException ex) {
return "failure";
}
}
数据库操作class:
public class SavetoDatabase {
public String saveObject(object){
try { pigRepository.saveAndFlush(objectMapper.readValue(receivedPig,Pig.class));
} catch (Exception ex) {
ex.printStackTrace();
return "failure";
}
return "success";
}
}
控制器class:
@RestController
public class MyController {
@Transactional
@RequestMapping("/read")
public String readAndSavePigFromQueue(){
String databaseSaveResult;
String messageFromQueue = ReadFromQueue.readMessageFromQueue(queueName);
// Some checks on returned message
databaseSaveResult = SaveToObject.savePigToDatabase(messageFromQueue);
if ("failure" == databaseSaveResult) {
return "Failed Transaction";
}
return "Successful transaction";
}
}
现在的挑战是,如何在这些分布式资源上以事务方式控制这个完整的操作。
RabbitMq据说是unspportive
到distributed transactions
,因此受到关注。
我希望任何失败的保存到数据库操作都会导致消息回滚到队列,并使失败的消息返回原始状态。
有什么建议吗?
经过更多搜索,文章 - http://docs.spring.io/spring-amqp/docs/1.2.0.RELEASE/reference/html/amqp.html#d4e602 帮助找到了解决方案,该解决方案位于 RabbitMqTemplate
的 isChannelTransacted
标志中
如果您希望任何 RabbitTemplate 操作参与事务,需要确保标志设置为 'true'。
可以使用下面的 bean 配置来实现相同的目的。
@Bean
public ConnectionFactory connectionFactory(){
return new CachingConnectionFactory();
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
如果有人知道更好的实施方法,请post您的回复以帮助我们做到最好。
我需要创建一个简单的应用程序,
- 从 RabbitMq 队列中读取第一条可用消息
- 将对象保存到数据库中Table
应用程序是 MVC 类型,并有一个公开的 http 端点来触发用户的操作。
调用中的所有操作都需要参与分布式事务,因此在一个工作单元下。
代码 class 是这样的:-
队列操作class:
public class ReadFromQueue {
pubic String readMessage(queue name) {
try {
rabbitTemplate.receiveAndConvert(queue);
} catch (AmqpException ex) {
return "failure";
}
}
数据库操作class:
public class SavetoDatabase {
public String saveObject(object){
try { pigRepository.saveAndFlush(objectMapper.readValue(receivedPig,Pig.class));
} catch (Exception ex) {
ex.printStackTrace();
return "failure";
}
return "success";
}
}
控制器class:
@RestController
public class MyController {
@Transactional
@RequestMapping("/read")
public String readAndSavePigFromQueue(){
String databaseSaveResult;
String messageFromQueue = ReadFromQueue.readMessageFromQueue(queueName);
// Some checks on returned message
databaseSaveResult = SaveToObject.savePigToDatabase(messageFromQueue);
if ("failure" == databaseSaveResult) {
return "Failed Transaction";
}
return "Successful transaction";
}
}
现在的挑战是,如何在这些分布式资源上以事务方式控制这个完整的操作。
RabbitMq据说是unspportive
到distributed transactions
,因此受到关注。
我希望任何失败的保存到数据库操作都会导致消息回滚到队列,并使失败的消息返回原始状态。
有什么建议吗?
经过更多搜索,文章 - http://docs.spring.io/spring-amqp/docs/1.2.0.RELEASE/reference/html/amqp.html#d4e602 帮助找到了解决方案,该解决方案位于 RabbitMqTemplate
isChannelTransacted
标志中
如果您希望任何 RabbitTemplate 操作参与事务,需要确保标志设置为 'true'。
可以使用下面的 bean 配置来实现相同的目的。
@Bean
public ConnectionFactory connectionFactory(){
return new CachingConnectionFactory();
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
如果有人知道更好的实施方法,请post您的回复以帮助我们做到最好。