如何在事件驱动架构中控制消息的幂等性?
How to control idempotency of messages in an event-driven architecture?
我正在做一个项目,其中 DynamoDB
被用作数据库,并且应用程序的每个用例都由 message
在一个项目被 created/updated 之后发布在数据库中。目前代码遵循这种方法:
repository.save(entity);
messagePublisher.publish(event);
Udi Dahan 有一个名为 Reliable Messaging Without Distributed Transactions
的视频,他在视频中谈到了系统在保存到数据库后但在发布消息之前可能发生故障的情况的解决方案,因为消息不是事务的一部分。但在他的解决方案中,我认为他假设使用 SQL
数据库,因为该过程涉及作为事务的一部分保存正在处理的消息的 correlationId、实体修改和要发布的消息。使用 NoSQL
数据库我想不出一种干净的方法来存储有关消息的信息。
一种解决方案是使用 DynamoDB
streams
并订阅使用 Lambda
或其他服务发布的事件,将它们转换为特定领域的事件。我的问题是我无法从域逻辑发送消息,逻辑将分布在处理消息的服务中,Lambda/service
对更改做出反应,解决方案将是特定于平台的.
还有其他方法可以解决这个问题吗?
我不能说基于 DynamoDB 的特定解决方案,因为我从未使用过该引擎。但是我在 MongoDB 之上构建了一个事件驱动系统,因此我可以分享我的经验,您可能会发现对您的案例有用。
您可以有不同的方法:
1) 基于事件溯源方法,您可以只保存 events/messages 您在交易中产生的用例。在 Mongo 当你只是 inserting/appending 新项目到同一个集合时你可以确保原子性。无论如何,如果引擎不提供该功能,则查询操作非常集中,您至少可以减少出错的可能性。
一旦存储了所有事件,您就可以使用它们并将它们投射到给定状态,然后将更新后的状态保存在另一个事务中。
这里你必须处理最终一致性,因为在你预测事件之前,你的读取模型中的数据将是陈旧的。
2) 另一种方法是应用 UnitOfWork
模式,在其中缓存所有查询操作 (insert/update/delete) 以保存事件和状态。用例完成后,您将对数据库执行所有缓存的查询(刷新)。这样,虽然操作不是原子的,但您再次将它们集中到足以最大限度地减少错误。
当然,如果您需要这种能力,最好是使用 ACID 数据库,任何其他方法都是接近它的解决方法。
关于发布事件我不知道你的意思是它们被发布到消息传输机制,如rabbitmq、Kafka等。但这必须是一个后台进程,你从数据库中获取事件并发布他们为了打破同一事务中的 2 阶段提交。
我正在做一个项目,其中 DynamoDB
被用作数据库,并且应用程序的每个用例都由 message
在一个项目被 created/updated 之后发布在数据库中。目前代码遵循这种方法:
repository.save(entity);
messagePublisher.publish(event);
Udi Dahan 有一个名为 Reliable Messaging Without Distributed Transactions
的视频,他在视频中谈到了系统在保存到数据库后但在发布消息之前可能发生故障的情况的解决方案,因为消息不是事务的一部分。但在他的解决方案中,我认为他假设使用 SQL
数据库,因为该过程涉及作为事务的一部分保存正在处理的消息的 correlationId、实体修改和要发布的消息。使用 NoSQL
数据库我想不出一种干净的方法来存储有关消息的信息。
一种解决方案是使用 DynamoDB
streams
并订阅使用 Lambda
或其他服务发布的事件,将它们转换为特定领域的事件。我的问题是我无法从域逻辑发送消息,逻辑将分布在处理消息的服务中,Lambda/service
对更改做出反应,解决方案将是特定于平台的.
还有其他方法可以解决这个问题吗?
我不能说基于 DynamoDB 的特定解决方案,因为我从未使用过该引擎。但是我在 MongoDB 之上构建了一个事件驱动系统,因此我可以分享我的经验,您可能会发现对您的案例有用。
您可以有不同的方法:
1) 基于事件溯源方法,您可以只保存 events/messages 您在交易中产生的用例。在 Mongo 当你只是 inserting/appending 新项目到同一个集合时你可以确保原子性。无论如何,如果引擎不提供该功能,则查询操作非常集中,您至少可以减少出错的可能性。
一旦存储了所有事件,您就可以使用它们并将它们投射到给定状态,然后将更新后的状态保存在另一个事务中。
这里你必须处理最终一致性,因为在你预测事件之前,你的读取模型中的数据将是陈旧的。
2) 另一种方法是应用 UnitOfWork
模式,在其中缓存所有查询操作 (insert/update/delete) 以保存事件和状态。用例完成后,您将对数据库执行所有缓存的查询(刷新)。这样,虽然操作不是原子的,但您再次将它们集中到足以最大限度地减少错误。
当然,如果您需要这种能力,最好是使用 ACID 数据库,任何其他方法都是接近它的解决方法。
关于发布事件我不知道你的意思是它们被发布到消息传输机制,如rabbitmq、Kafka等。但这必须是一个后台进程,你从数据库中获取事件并发布他们为了打破同一事务中的 2 阶段提交。