是否可以仅针对 RabbitMQ 发布失败场景实施事务性发件箱模式
Is it possible to implement transactional outbox pattern for only RabbitMQ publish fail scenarios
我有一个使用 mongoDB 作为持久性并使用 RabbitMQ 作为消息代理的系统。我有一个挑战,我只想为 RabbitMQ 发布失败场景实施 transactional outbox。我不确定这是否可能,因为我也有使用相同 mongoDB 持久性的消费者,所以当我编写代码涵盖 RabbitMQ 发布失败场景的事务发件箱时,已发布的消息在 [=28] 之前到达消费者=] commitTransaction 所以我的消费者由于延迟无法在 mongoDB 中找到消息。
我的代码如下所示;
1-开始会话交易
2- 使用会话插入到文档中(因此在我调用提交之前它不会持续存在)
3-发布rabbitMQ
4-如果成功commitTransaction
5-如果错误插入发件箱文档,会话比 commitTransaction
6- 如果 mongoDB abortTransaction 出现问题(如果发布成功而 mongoDB 失败,我的消费者首先检查 mongoDB 是否存在,如果不存在则不要什么都不做。)
So the problem is in here messages reaching consumer earlier than
mongoDB persistence, do you advice any solution that covers my
problem?
据我所知,https://microservices.io/patterns/data/transactional-outbox.html 中图片中概述的架构直接映射到 MongoDB 更改流:
- 保持交易在 1 左右
- 在交易中插入发件箱table
- 设置消息中继进程,请求发件箱上的更改流table,并为每个插入的文档向消息代理发布消息
可以重试发布到消息代理,并且在出现任何错误时也可以重试更改流读取。您需要正确跟踪简历令牌,请参阅例如https://docs.mongodb.com/ruby-driver/master/reference/change-streams/#resuming-a-change-stream.
这种方法的局限性:
- 只有一个消息中继进程,没有可伸缩性,也没有冗余 - 如果它死了,你不会收到通知,直到它回来
您提出的解决方案有一组不同的问题,例如,在提交之前发布通知,您会面临通知处理器无法找到它从消息代理中获取的文档的可能性,正如您所说.
所以我想分享我的解决方案。
遗憾的是,无法仅针对失败场景实施事务性发件箱模式。
我的决定是,围绕高可用性创建一个架构;
MongoDB 作为高可用持久性,RabbitMQ 作为高可用消息代理。
我删除了之前编码的所有会话事务并实现了立即写入和发布。
最坏的情况:
1-插入文档(成功)
2- rabbitmq 发布(失败)
3- 插入发件箱(失败)
我的 mongo 中将有未发布的文档。即使在最坏的情况下,我也可以使用另一个应用程序重新发布来自 MongoDB 的消息,但在我面对这种情况之前我不会编写该应用程序,因为我们无法涵盖代码中的所有失败场景。所以我们的消息代理或持久化必须是高可用的。
我有一个使用 mongoDB 作为持久性并使用 RabbitMQ 作为消息代理的系统。我有一个挑战,我只想为 RabbitMQ 发布失败场景实施 transactional outbox。我不确定这是否可能,因为我也有使用相同 mongoDB 持久性的消费者,所以当我编写代码涵盖 RabbitMQ 发布失败场景的事务发件箱时,已发布的消息在 [=28] 之前到达消费者=] commitTransaction 所以我的消费者由于延迟无法在 mongoDB 中找到消息。
我的代码如下所示;
1-开始会话交易
2- 使用会话插入到文档中(因此在我调用提交之前它不会持续存在)
3-发布rabbitMQ
4-如果成功commitTransaction
5-如果错误插入发件箱文档,会话比 commitTransaction
6- 如果 mongoDB abortTransaction 出现问题(如果发布成功而 mongoDB 失败,我的消费者首先检查 mongoDB 是否存在,如果不存在则不要什么都不做。)
So the problem is in here messages reaching consumer earlier than mongoDB persistence, do you advice any solution that covers my problem?
据我所知,https://microservices.io/patterns/data/transactional-outbox.html 中图片中概述的架构直接映射到 MongoDB 更改流:
- 保持交易在 1 左右
- 在交易中插入发件箱table
- 设置消息中继进程,请求发件箱上的更改流table,并为每个插入的文档向消息代理发布消息
可以重试发布到消息代理,并且在出现任何错误时也可以重试更改流读取。您需要正确跟踪简历令牌,请参阅例如https://docs.mongodb.com/ruby-driver/master/reference/change-streams/#resuming-a-change-stream.
这种方法的局限性:
- 只有一个消息中继进程,没有可伸缩性,也没有冗余 - 如果它死了,你不会收到通知,直到它回来
您提出的解决方案有一组不同的问题,例如,在提交之前发布通知,您会面临通知处理器无法找到它从消息代理中获取的文档的可能性,正如您所说.
所以我想分享我的解决方案。
遗憾的是,无法仅针对失败场景实施事务性发件箱模式。
我的决定是,围绕高可用性创建一个架构;
MongoDB 作为高可用持久性,RabbitMQ 作为高可用消息代理。
我删除了之前编码的所有会话事务并实现了立即写入和发布。
最坏的情况:
1-插入文档(成功)
2- rabbitmq 发布(失败)
3- 插入发件箱(失败)
我的 mongo 中将有未发布的文档。即使在最坏的情况下,我也可以使用另一个应用程序重新发布来自 MongoDB 的消息,但在我面对这种情况之前我不会编写该应用程序,因为我们无法涵盖代码中的所有失败场景。所以我们的消息代理或持久化必须是高可用的。