分布式数据库事务中的 RabbitMQ 和交付保证

RabbitMQ and Delivery Guarantees in Distributed Database Transaction

我试图了解在分布式数据库事务的上下文中处理 RabbitMQ 交付的正确模式是什么。

为简单起见,我将用伪代码说明我的想法,但实际上我正在使用 Spring AMQP 来实现这些想法。

任何类似

void foo(message) {
   processMessageInDatabaseTransaction(message);
   sendMessageToRabbitMQ(message);
}

当我们到达 sendMessageToRabbitMQ() 时,processMessageInDatabaseTransaction() 已成功将其更改提交到数据库,或者在到达消息发送代码之前已抛出异常。

我知道对于 sendMessageToRabbitMQ() 我可以使用 Rabbit transactions or publisher confirms 来保证 Rabbit 收到我的消息。

我的兴趣是了解当事情变糟时应该发生什么,即当数据库事务成功时,但在一定时间后确认没有到达(发布者确认)或 Rabbit 事务未能提交(用 Rabbit交易)。

一旦发生这种情况,保证我的消息传递的正确模式是什么?

当然,开发幂等消费者后,我考虑过可以重试发送消息,直到Rabbit确认成功:

void foo(message) {
   processMessageInDatabaseTransaction(message);
   retryUntilSuccessFull {
      sendMessagesToRabbitMQ(message);
   }
}

但是这种模式有几个我不喜欢的缺点,首先,如果失败时间延长,我的线程将开始阻塞在这里,我的系统最终将变得无响应。其次,如果我的系统崩溃或关闭会怎样?那时我永远不会传递这些消息,因为它们会丢失。

所以,我想,好吧,我必须先将我的消息写入数据库,处于待处理状态,然后从那里发布我的待处理消息:

void foo(message) {
   //transaction commits leaving message in pending status
   processMessageInDatabaseTransaction(message);
}

@Poller(every="10 seconds")
void bar() {
   for(message in readPendingMessagesFromDbStore()) {
      sendPendingMessageToRabbitMQ(message);
      if(confirmed) {
          acknowledgeMessageInDatabase(message); 
      }
   }
}

如果我未能在我的数据库中确认消息,可能会多次发送消息。

但是现在我引入了其他问题:

然后我想好吧,我可以让它更复杂一点,比如,我可以从数据库发布,直到我赶上事件的实时流,然后实时发布,即维护一个大小为 b 的缓冲区(循环缓冲区)当我根据页面阅读时检查该消息是否在缓冲区中。如果是,则切换到实时订阅。

至此,我意识到如何正确地做到这一点并不十分明显,因此我得出结论,我需要了解解决此问题的正确模式。

那么,有人对正确执行此操作的正确方法有什么建议吗?

当 Rabbit 未能收到消息时(无论出于何种原因,但根据我的经验,仅仅是因为服务已关闭或不可用),您应该能够捕获错误。在这一点上,您可以记录那个——以及任何后续的——失败的尝试,以便在 Rabbit 再次可用时重试。最快的方法是将消息详细信息记录到文件中,然后在适当的时候迭代重新发送

只要您拥有该文件,就不会丢失邮件。

一旦消息进入 Rabbit,并且您对架构的其余部分有信心,可以安全地假设消息将在它们应该出现的位置结束,并且您不需要做进一步的持久性工作。

虽然 RabbitMQ 无法参与真正的全局 (XA) 事务,但您可以使用 Spring 事务管理将数据库事务与 Rabbit 事务同步,这样如果任一更新失败,两个事务都将滚动背部。有一个(非常)小的时间漏洞,一个人可能会提交但另一个人不会提交,因此您确实需要处理这种可能性。

有关详细信息,请参阅 Dave Syer's Javaworld Article