spring-amqp 事务语义
spring-amqp transaction semantics
我目前正在测试一个相当简单的示例,该示例涉及与使用 spring amqp 的数据库事务有关的消息传递事务。
使用案例如下:
- 收到消息
- 一条消息已发送
数据库已更新
@Transactional
public void handleMessage(EventPayload event) {
MyEntity entity = new MyEntity();
entity.setName(event.getName());
rabbitTemplate.convertAndSend("myExchange", "payload.create", payload);
MyEntity savedEntity = entityRepository.save(entity);
}
如果在数据库操作期间出现故障,预期的行为是接收到的消息回滚到总线 (DefaultRequeueRejected = false) 并进入死信队列。也应该回滚发送的消息。
我可以通过以下配置实现:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
PlatformTransactionManager transactionManager) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
container.setMessageListener(listenerAdapter);
container.setChannelTransacted(true);
container.setTransactionManager(transactionManager);
container.setDefaultRequeueRejected(false);
return container;
}
所以这工作正常 - 我不明白的是,如果我不在 SimpleMessageListenerContainer
上设置事务管理器,观察到的行为是完全一样的。因此,如果我配置以下行为不会改变:
@Bean
SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
container.setMessageListener(listenerAdapter);
container.setDefaultRequeueRejected(false);
return container;
}
有人可以解释一下那里发生了什么吗?为什么第二种情况也有效?如果 PlatformTransactionManager
在 SimpleMessageListenerContainer
上注册,内部有什么不同。
假设 transactionManager
是您的数据库 tm,因为您的侦听器是 @Transactional
,所以这些场景没有太大区别。
在第一种情况下,事务在调用侦听器之前由容器启动(实际上是在从内部队列中检索消息之前,因此即使没有消息也会启动事务)。
在第二种情况下,事务是在我们调用监听器时由事务拦截器启动的。
考虑以下情况:侦听器不是事务性的,但某些下游组件是事务性的;假设侦听器成功调用了该组件,然后在抛出异常之前做了更多工作。在这种情况下,数据库提交会成功并且消息会被拒绝。这可能不是期望的行为,尤其是在消息重新排队时。在这种情况下,通常通过注入数据库tm来同步rabbit事务和数据库事务会更好。
在你的情况下,db 提交和 rabbit ack 之间几乎没有失败的机会,所以这真的不适用于你的情况,你不需要容器中的 tm。
我目前正在测试一个相当简单的示例,该示例涉及与使用 spring amqp 的数据库事务有关的消息传递事务。
使用案例如下:
- 收到消息
- 一条消息已发送
数据库已更新
@Transactional public void handleMessage(EventPayload event) { MyEntity entity = new MyEntity(); entity.setName(event.getName()); rabbitTemplate.convertAndSend("myExchange", "payload.create", payload); MyEntity savedEntity = entityRepository.save(entity); }
如果在数据库操作期间出现故障,预期的行为是接收到的消息回滚到总线 (DefaultRequeueRejected = false) 并进入死信队列。也应该回滚发送的消息。
我可以通过以下配置实现:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
PlatformTransactionManager transactionManager) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
container.setMessageListener(listenerAdapter);
container.setChannelTransacted(true);
container.setTransactionManager(transactionManager);
container.setDefaultRequeueRejected(false);
return container;
}
所以这工作正常 - 我不明白的是,如果我不在 SimpleMessageListenerContainer
上设置事务管理器,观察到的行为是完全一样的。因此,如果我配置以下行为不会改变:
@Bean
SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
container.setMessageListener(listenerAdapter);
container.setDefaultRequeueRejected(false);
return container;
}
有人可以解释一下那里发生了什么吗?为什么第二种情况也有效?如果 PlatformTransactionManager
在 SimpleMessageListenerContainer
上注册,内部有什么不同。
假设 transactionManager
是您的数据库 tm,因为您的侦听器是 @Transactional
,所以这些场景没有太大区别。
在第一种情况下,事务在调用侦听器之前由容器启动(实际上是在从内部队列中检索消息之前,因此即使没有消息也会启动事务)。
在第二种情况下,事务是在我们调用监听器时由事务拦截器启动的。
考虑以下情况:侦听器不是事务性的,但某些下游组件是事务性的;假设侦听器成功调用了该组件,然后在抛出异常之前做了更多工作。在这种情况下,数据库提交会成功并且消息会被拒绝。这可能不是期望的行为,尤其是在消息重新排队时。在这种情况下,通常通过注入数据库tm来同步rabbit事务和数据库事务会更好。
在你的情况下,db 提交和 rabbit ack 之间几乎没有失败的机会,所以这真的不适用于你的情况,你不需要容器中的 tm。