与事件溯源和 CQRS 聚合
Aggregates with Event Sourcing and CQRS
为了处理大量遥测数据并仍然能够对数据执行快速查询,我采用了使用 Azure Functions 和 Azure Cosmos DB 的事件溯源/CQRS 模式。
在我的架构中,入站遥测流存储在充当事件存储的 Cosmos DB 集合中。
为了创建原始遥测数据的物化视图,我将另一个 Azure 函数与 Cosmos DB 触发器结合使用,它在我的事件存储中存储的所有新文档上激活,对这些文档执行转换。
在每个文档的基础上处理一个文档非常容易。
棘手的地方在于,当我需要引用其他文档以计算我的实体化视图时。
例如,当接收到的遥测事件包含相对计数器值(例如特定操作中使用的能量)时。在我的实体化视图中,我想要一个包含所有能源消耗总和的文档。
现在一个简单的实现是在我的物化视图中查看此文档的当前状态,然后将此值增加新接收到的值。
我使用这种方法可能会遇到的问题是,当我必须重新计算我的物化视图时,因为在未来的版本中我需要生成一些额外的视图。
对于重新计算,我只需在我的事件存储中触摸我想要重新计算的所有相关文档,触发再次计算物化视图的 Azure 函数。这将导致文档进入之前处理过的此 Azure 函数。
当重新计算发生时,如果我简单地增加我的总和,我的计数器将不再准确,因为已经是总和一部分的文档将被再次添加。
解决这个重新计算场景(我想到的)的方法是:
- 跟踪所有属于总和的源文档并忽略
已经是总和的一部分的文档的事件
- 跟踪已经是总和的一部分的最新遥测事件的序列号,并在重新计算时忽略序列号低于已经是总和的一部分的事件。
你能给我一些关于如何正确解决这种情况的建议吗?
因此,总结@Mikhail 和@RomanEremin 的评论以及我的想法,这将是处理这些情况的方法:
如果重新计算观看次数:
- 删除现有聚合并从头开始构建重放
活动商店中的活动。
在事件总线交付的情况下 "at-least-once"(Azure 与 CosmosDb 触发器的运行方式是底层 ChangeFeedProcessor 的结果):
- 版本 1:跟踪事件 ID(文档 ID),它们是聚合文档中聚合的一部分,并忽略已经是聚合一部分的事件。
- 版本2:提供源事件的顺序版本(序号),并将聚合所基于的版本存储在聚合文档中。计算聚合时,请根据事件的序列号检查此序列号。如果事件的序列号低于聚合文档:忽略,否则重新计算聚合并更新聚合所基于的序列号。
为了处理大量遥测数据并仍然能够对数据执行快速查询,我采用了使用 Azure Functions 和 Azure Cosmos DB 的事件溯源/CQRS 模式。
在我的架构中,入站遥测流存储在充当事件存储的 Cosmos DB 集合中。
为了创建原始遥测数据的物化视图,我将另一个 Azure 函数与 Cosmos DB 触发器结合使用,它在我的事件存储中存储的所有新文档上激活,对这些文档执行转换。
在每个文档的基础上处理一个文档非常容易。 棘手的地方在于,当我需要引用其他文档以计算我的实体化视图时。
例如,当接收到的遥测事件包含相对计数器值(例如特定操作中使用的能量)时。在我的实体化视图中,我想要一个包含所有能源消耗总和的文档。
现在一个简单的实现是在我的物化视图中查看此文档的当前状态,然后将此值增加新接收到的值。
我使用这种方法可能会遇到的问题是,当我必须重新计算我的物化视图时,因为在未来的版本中我需要生成一些额外的视图。
对于重新计算,我只需在我的事件存储中触摸我想要重新计算的所有相关文档,触发再次计算物化视图的 Azure 函数。这将导致文档进入之前处理过的此 Azure 函数。
当重新计算发生时,如果我简单地增加我的总和,我的计数器将不再准确,因为已经是总和一部分的文档将被再次添加。
解决这个重新计算场景(我想到的)的方法是:
- 跟踪所有属于总和的源文档并忽略 已经是总和的一部分的文档的事件
- 跟踪已经是总和的一部分的最新遥测事件的序列号,并在重新计算时忽略序列号低于已经是总和的一部分的事件。
你能给我一些关于如何正确解决这种情况的建议吗?
因此,总结@Mikhail 和@RomanEremin 的评论以及我的想法,这将是处理这些情况的方法:
如果重新计算观看次数:
- 删除现有聚合并从头开始构建重放 活动商店中的活动。
在事件总线交付的情况下 "at-least-once"(Azure 与 CosmosDb 触发器的运行方式是底层 ChangeFeedProcessor 的结果):
- 版本 1:跟踪事件 ID(文档 ID),它们是聚合文档中聚合的一部分,并忽略已经是聚合一部分的事件。
- 版本2:提供源事件的顺序版本(序号),并将聚合所基于的版本存储在聚合文档中。计算聚合时,请根据事件的序列号检查此序列号。如果事件的序列号低于聚合文档:忽略,否则重新计算聚合并更新聚合所基于的序列号。