分布式数据库事务中的 RabbitMQ 和交付保证
RabbitMQ and Delivery Guarantees in Distributed Database Transaction
我试图了解在分布式数据库事务的上下文中处理 RabbitMQ 交付的正确模式是什么。
为简单起见,我将用伪代码说明我的想法,但实际上我正在使用 Spring AMQP 来实现这些想法。
任何类似
void foo(message) {
processMessageInDatabaseTransaction(message);
sendMessageToRabbitMQ(message);
}
当我们到达 sendMessageToRabbitMQ()
时,processMessageInDatabaseTransaction()
已成功将其更改提交到数据库,或者在到达消息发送代码之前已抛出异常。
我知道对于 sendMessageToRabbitMQ()
我可以使用 Rabbit transactions or publisher confirms 来保证 Rabbit 收到我的消息。
我的兴趣是了解当事情变糟时应该发生什么,即当数据库事务成功时,但在一定时间后确认没有到达(发布者确认)或 Rabbit 事务未能提交(用 Rabbit交易)。
一旦发生这种情况,保证我的消息传递的正确模式是什么?
当然,开发幂等消费者后,我考虑过可以重试发送消息,直到Rabbit确认成功:
void foo(message) {
processMessageInDatabaseTransaction(message);
retryUntilSuccessFull {
sendMessagesToRabbitMQ(message);
}
}
但是这种模式有几个我不喜欢的缺点,首先,如果失败时间延长,我的线程将开始阻塞在这里,我的系统最终将变得无响应。其次,如果我的系统崩溃或关闭会怎样?那时我永远不会传递这些消息,因为它们会丢失。
所以,我想,好吧,我必须先将我的消息写入数据库,处于待处理状态,然后从那里发布我的待处理消息:
void foo(message) {
//transaction commits leaving message in pending status
processMessageInDatabaseTransaction(message);
}
@Poller(every="10 seconds")
void bar() {
for(message in readPendingMessagesFromDbStore()) {
sendPendingMessageToRabbitMQ(message);
if(confirmed) {
acknowledgeMessageInDatabase(message);
}
}
}
如果我未能在我的数据库中确认消息,可能会多次发送消息。
但是现在我引入了其他问题:
- 需要做I/O从数据库发布一条消息,99%的时间会立即成功发布,而无需检查数据库。
- 使轮询器更接近实时传递的困难,因为现在我已经增加了消息发布的延迟。
- 也许还有其他复杂情况,例如保证按顺序交付事件、轮询器执行步进、多个轮询器等。
然后我想好吧,我可以让它更复杂一点,比如,我可以从数据库发布,直到我赶上事件的实时流,然后实时发布,即维护一个大小为 b 的缓冲区(循环缓冲区)当我根据页面阅读时检查该消息是否在缓冲区中。如果是,则切换到实时订阅。
至此,我意识到如何正确地做到这一点并不十分明显,因此我得出结论,我需要了解解决此问题的正确模式。
那么,有人对正确执行此操作的正确方法有什么建议吗?
当 Rabbit 未能收到消息时(无论出于何种原因,但根据我的经验,仅仅是因为服务已关闭或不可用),您应该能够捕获错误。在这一点上,您可以记录那个——以及任何后续的——失败的尝试,以便在 Rabbit 再次可用时重试。最快的方法是将消息详细信息记录到文件中,然后在适当的时候迭代重新发送。
只要您拥有该文件,就不会丢失邮件。
一旦消息进入 Rabbit,并且您对架构的其余部分有信心,可以安全地假设消息将在它们应该出现的位置结束,并且您不需要做进一步的持久性工作。
虽然 RabbitMQ 无法参与真正的全局 (XA) 事务,但您可以使用 Spring 事务管理将数据库事务与 Rabbit 事务同步,这样如果任一更新失败,两个事务都将滚动背部。有一个(非常)小的时间漏洞,一个人可能会提交但另一个人不会提交,因此您确实需要处理这种可能性。
有关详细信息,请参阅 Dave Syer's Javaworld Article。
我试图了解在分布式数据库事务的上下文中处理 RabbitMQ 交付的正确模式是什么。
为简单起见,我将用伪代码说明我的想法,但实际上我正在使用 Spring AMQP 来实现这些想法。
任何类似
void foo(message) {
processMessageInDatabaseTransaction(message);
sendMessageToRabbitMQ(message);
}
当我们到达 sendMessageToRabbitMQ()
时,processMessageInDatabaseTransaction()
已成功将其更改提交到数据库,或者在到达消息发送代码之前已抛出异常。
我知道对于 sendMessageToRabbitMQ()
我可以使用 Rabbit transactions or publisher confirms 来保证 Rabbit 收到我的消息。
我的兴趣是了解当事情变糟时应该发生什么,即当数据库事务成功时,但在一定时间后确认没有到达(发布者确认)或 Rabbit 事务未能提交(用 Rabbit交易)。
一旦发生这种情况,保证我的消息传递的正确模式是什么?
当然,开发幂等消费者后,我考虑过可以重试发送消息,直到Rabbit确认成功:
void foo(message) {
processMessageInDatabaseTransaction(message);
retryUntilSuccessFull {
sendMessagesToRabbitMQ(message);
}
}
但是这种模式有几个我不喜欢的缺点,首先,如果失败时间延长,我的线程将开始阻塞在这里,我的系统最终将变得无响应。其次,如果我的系统崩溃或关闭会怎样?那时我永远不会传递这些消息,因为它们会丢失。
所以,我想,好吧,我必须先将我的消息写入数据库,处于待处理状态,然后从那里发布我的待处理消息:
void foo(message) {
//transaction commits leaving message in pending status
processMessageInDatabaseTransaction(message);
}
@Poller(every="10 seconds")
void bar() {
for(message in readPendingMessagesFromDbStore()) {
sendPendingMessageToRabbitMQ(message);
if(confirmed) {
acknowledgeMessageInDatabase(message);
}
}
}
如果我未能在我的数据库中确认消息,可能会多次发送消息。
但是现在我引入了其他问题:
- 需要做I/O从数据库发布一条消息,99%的时间会立即成功发布,而无需检查数据库。
- 使轮询器更接近实时传递的困难,因为现在我已经增加了消息发布的延迟。
- 也许还有其他复杂情况,例如保证按顺序交付事件、轮询器执行步进、多个轮询器等。
然后我想好吧,我可以让它更复杂一点,比如,我可以从数据库发布,直到我赶上事件的实时流,然后实时发布,即维护一个大小为 b 的缓冲区(循环缓冲区)当我根据页面阅读时检查该消息是否在缓冲区中。如果是,则切换到实时订阅。
至此,我意识到如何正确地做到这一点并不十分明显,因此我得出结论,我需要了解解决此问题的正确模式。
那么,有人对正确执行此操作的正确方法有什么建议吗?
当 Rabbit 未能收到消息时(无论出于何种原因,但根据我的经验,仅仅是因为服务已关闭或不可用),您应该能够捕获错误。在这一点上,您可以记录那个——以及任何后续的——失败的尝试,以便在 Rabbit 再次可用时重试。最快的方法是将消息详细信息记录到文件中,然后在适当的时候迭代重新发送。
只要您拥有该文件,就不会丢失邮件。
一旦消息进入 Rabbit,并且您对架构的其余部分有信心,可以安全地假设消息将在它们应该出现的位置结束,并且您不需要做进一步的持久性工作。
虽然 RabbitMQ 无法参与真正的全局 (XA) 事务,但您可以使用 Spring 事务管理将数据库事务与 Rabbit 事务同步,这样如果任一更新失败,两个事务都将滚动背部。有一个(非常)小的时间漏洞,一个人可能会提交但另一个人不会提交,因此您确实需要处理这种可能性。
有关详细信息,请参阅 Dave Syer's Javaworld Article。