spring-amqp 事务语义

spring-amqp transaction semantics

我目前正在测试一个相当简单的示例,该示例涉及与使用 spring amqp 的数据库事务有关的消息传递事务。

使用案例如下:

如果在数据库操作期间出现故障,预期的行为是接收到的消息回滚到总线 (DefaultRequeueRejected = false) 并进入死信队列。也应该回滚发送的消息。

我可以通过以下配置实现:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(messageConverter);
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
}

@Bean
    SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
                                                              MessageListenerAdapter listenerAdapter,
                                                              PlatformTransactionManager transactionManager) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        container.setChannelTransacted(true);
        container.setTransactionManager(transactionManager);
        container.setDefaultRequeueRejected(false);
        return container;
    }

所以这工作正常 - 我不明白的是,如果我不在 SimpleMessageListenerContainer 上设置事务管理器,观察到的行为是完全一样的。因此,如果我配置以下行为不会改变:

@Bean
        SimpleMessageListenerContainer subscriberListenerContainer(ConnectionFactory connectionFactory,
                                                                  MessageListenerAdapter listenerAdapter) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(SUBSCRIBER_QUEUE_NAME);
            container.setMessageListener(listenerAdapter);
            container.setDefaultRequeueRejected(false);
            return container;
        }

有人可以解释一下那里发生了什么吗?为什么第二种情况也有效?如果 PlatformTransactionManagerSimpleMessageListenerContainer 上注册,内部有什么不同。

假设 transactionManager 是您的数据库 tm,因为您的侦听器是 @Transactional,所以这些场景没有太大区别。

在第一种情况下,事务在调用侦听器之前由容器启动(实际上是在从内部队列中检索消息之前,因此即使没有消息也会启动事务)。

在第二种情况下,事务是在我们调用监听器时由事务拦截器启动的。

考虑以下情况:侦听器不是事务性的,但某些下游组件是事务性的;假设侦听器成功调用了该组件,然后在抛出异常之前做了更多工作。在这种情况下,数据库提交会成功并且消息会被拒绝。这可能不是期望的行为,尤其是在消息重新排队时。在这种情况下,通常通过注入数据库tm来同步rabbit事务和数据库事务会更好。

在你的情况下,db 提交和 rabbit ack 之间几乎没有失败的机会,所以这真的不适用于你的情况,你不需要容器中的 tm。