MQ - 如何在非事务、轻量级环境中保证消息传递?
MQ - How to guarantee message delivery in a non-transacted, lightweight environment?
如何在非事务、轻量级环境中保证消息传递?
例如:
- 正常情况:写入数据库,提交,发送消息到ZeroMQ|Redis|OtherMQ,消费者拉取消息继续处理...
- 0,05% 情况:写入数据库,提交,应用程序死亡!,没有发送消息,没有消费者拉取消息,处理不完整。
如何在这种情况下不丢失消息(避免不发送消息)?
编辑: 消息必须恰好传递一次。
在这种情况下,您有 2 个共享资源(数据库和队列)并且您希望将它们一起处理。如果消息发送到队列,您希望数据库提交。如果未成功发送,您希望数据库不提交,反之亦然。这就是类似于 2PC 的全局事务机制。但是要实现一个全局的事务机制并不容易,成本也很高。
我建议你至少在生产者端实施一种策略,在消费者端实施幂等性以提供一致性。
您应该在生产者端的数据库中创建一条消息 table,并在发送到队列之前将消息保存到此 table。然后使用计划线程(这里可能有多个线程来增加吞吐量,但如果您的消息需要按照它们产生的顺序使用时要小心)或任何其他您可以将它们发送到队列并将它们标记为已发送以确保已发送的消息将不再发送。即使您这样做了,在某些情况下您的消息也可能会被发送多次(例如,您将消息发送到队列并且您的应用程序在将消息标记为已发送之前崩溃了)。但这不是问题,因为我们已经想在生产者端实现至少一次策略,这意味着我们希望消息至少发送一次到队列。
要防止消费者使用在生产者端多次生成的相同消息,您应该实施幂等消费者。简单地说,您可以将已消费消息的 id 保存到消费者端的数据库 table 中,并且在处理来自队列的消息之前,您可以检查它是否已被消费。如果它已经被消耗,你应该忽略它并得到下一条消息。
当然还有其他选项可以在微服务环境中提供一致性。您可以在这个很棒的博客上找到其他解决方案 - https://www.nginx.com/blog/event-driven-data-management-microservices/。我上面解释的解决方案也存在于这个博客中。您可以在使用本地交易发布事件部分找到它。
这里可能是一个简单的方法。
假设您有交易:
- 向数据库写入数据
- 通过 ZMQ 发送消息
- 写入 DB 发送成功
所以假设您的应用程序在您执行第 2 步或第 3 步时崩溃。如果是这样,您不知道最后一条消息是否确实收到了客户队列,并且您必须在重新启动所有消息后重新发送而无需最后确认(步骤 3).
问题出在消费者方面,因为他们可能会收到两次消息。要解决此问题,您可以在每条消息中发送一个始终递增的事务 ID。消费者必须注意最后一条消息的事务 ID。当传入消息的事务 ID 不高于最后一条消息的事务 ID 时,可以忽略该消息。
现在的问题是您是否可以修改消息结构以及可以使用哪个事务 ID。
如何在非事务、轻量级环境中保证消息传递?
例如:
- 正常情况:写入数据库,提交,发送消息到ZeroMQ|Redis|OtherMQ,消费者拉取消息继续处理...
- 0,05% 情况:写入数据库,提交,应用程序死亡!,没有发送消息,没有消费者拉取消息,处理不完整。
如何在这种情况下不丢失消息(避免不发送消息)?
编辑: 消息必须恰好传递一次。
在这种情况下,您有 2 个共享资源(数据库和队列)并且您希望将它们一起处理。如果消息发送到队列,您希望数据库提交。如果未成功发送,您希望数据库不提交,反之亦然。这就是类似于 2PC 的全局事务机制。但是要实现一个全局的事务机制并不容易,成本也很高。
我建议你至少在生产者端实施一种策略,在消费者端实施幂等性以提供一致性。
您应该在生产者端的数据库中创建一条消息 table,并在发送到队列之前将消息保存到此 table。然后使用计划线程(这里可能有多个线程来增加吞吐量,但如果您的消息需要按照它们产生的顺序使用时要小心)或任何其他您可以将它们发送到队列并将它们标记为已发送以确保已发送的消息将不再发送。即使您这样做了,在某些情况下您的消息也可能会被发送多次(例如,您将消息发送到队列并且您的应用程序在将消息标记为已发送之前崩溃了)。但这不是问题,因为我们已经想在生产者端实现至少一次策略,这意味着我们希望消息至少发送一次到队列。
要防止消费者使用在生产者端多次生成的相同消息,您应该实施幂等消费者。简单地说,您可以将已消费消息的 id 保存到消费者端的数据库 table 中,并且在处理来自队列的消息之前,您可以检查它是否已被消费。如果它已经被消耗,你应该忽略它并得到下一条消息。
当然还有其他选项可以在微服务环境中提供一致性。您可以在这个很棒的博客上找到其他解决方案 - https://www.nginx.com/blog/event-driven-data-management-microservices/。我上面解释的解决方案也存在于这个博客中。您可以在使用本地交易发布事件部分找到它。
这里可能是一个简单的方法。
假设您有交易:
- 向数据库写入数据
- 通过 ZMQ 发送消息
- 写入 DB 发送成功
所以假设您的应用程序在您执行第 2 步或第 3 步时崩溃。如果是这样,您不知道最后一条消息是否确实收到了客户队列,并且您必须在重新启动所有消息后重新发送而无需最后确认(步骤 3).
问题出在消费者方面,因为他们可能会收到两次消息。要解决此问题,您可以在每条消息中发送一个始终递增的事务 ID。消费者必须注意最后一条消息的事务 ID。当传入消息的事务 ID 不高于最后一条消息的事务 ID 时,可以忽略该消息。
现在的问题是您是否可以修改消息结构以及可以使用哪个事务 ID。