Mass Transit:确保消息类型不同时的消息处理顺序
Mass Transit: ensure message processing order when there are different message types
我是公共交通的新手,我想了解它是否对我的场景有帮助。
我正在构建一个使用 CQRS 事件源架构实现的示例应用程序,我需要一个服务总线来将命令堆栈创建的事件分派到查询堆栈非规范化器。
让我们假设在我们的域中有一个聚合,我们称之为 Photo,以及两个不同的域事件:PhotoUploaded 和 PhotoArchived.
在这种情况下,我们有两种不同的消息类型,默认的 Mass Transit 行为是创建两种不同的 RabbitMq 交换:一种用于 PhotoUploaded 消息类型,另一种用于 PhotoArchived 消息类型。
让我们假设有一个名为 PhotoDenormalizer 的反规范化器:此服务将成为两种消息类型的消费者,因为照片读取模型必须在上传或上传照片时更新存档。
给定默认的 Mass Transit 拓扑,将有两个不同的交换,因此无法保证不同类型事件之间的消息处理顺序:我们拥有的唯一保证是将处理所有相同类型的事件顺序,但我们不能保证不同类型事件之间的处理顺序(请注意,鉴于我的示例的事件语义,处理顺序很重要)。
我该如何处理这种情况? Mass Transit 是否适合我的需要?我是否完全忽略了域事件调度的要点?
免责声明:这不是您问题的答案,而是一条预防性信息,说明您为什么不应该做您计划的事情要做。
虽然像 RMQ 这样的消息代理和像 MassTransit 这样的消息中间件库非常适合集成,但我强烈建议不要使用消息代理进行事件溯源。我可以参考我的旧答案 Event-sourcing: when (and not) should I use Message Queue? 解释其背后的原因。
您发现自己的原因之一 - 活动顺序永远无法保证。
另一个明显的原因是,从通过消息代理发布的事件构建读取模型有效地消除了重放的可能性,并构建了需要从一开始就开始处理事件的新读取模型,但所有这些get 是正在发布的事件现在。
聚合形成事务边界,因此每个命令都需要保证它在一个事务内完成。虽然 MT 支持 transaction middleware,但它仅保证您获得支持它们的依赖项的事务,但不保证消费者主体中的 context.Publish(@event)
,因为 RMQ 不支持事务。您很有可能提交更改而不是在读取端获取事件。因此,事件存储的经验法则是您应该能够从存储 订阅更改流 ,而不是从您的代码发布事件,除非这些是集成事件而不是领域事件。
对于事件溯源,至关重要的是每个读取模型在其投影的事件流中保留自己的检查点。消息代理不会给你那种权力,因为 "checkpoint" 实际上是你的队列,一旦消息从队列中消失 - 它就永远消失了,不会再回来了。
关于实际问题:
您可以使用 message topology configuration 为不同的消息设置相同的实体名称,然后它们将被发布到同一个交换器,但这属于 "abuse" 类别,就像 Chris 在那个页面。我没试过,但你绝对可以尝试。消息 CLR 类型是元数据的一部分,因此不应该存在反序列化问题。
但同样,将消息放在同一个交换器中不会给您任何排序保证,除了所有消息都将进入一个队列以供消费服务这一事实。
您至少必须根据聚合 ID 设置分区过滤器,以防止并行处理同一聚合的多条消息。顺便说一下,这对于集成也很有用。我们就是这样做的:
void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
=> ep.Handler<T>(
c => appService.Handle(c, aggregateStore),
hc => hc.UsePartitioner(8, partition));
AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);
我是公共交通的新手,我想了解它是否对我的场景有帮助。 我正在构建一个使用 CQRS 事件源架构实现的示例应用程序,我需要一个服务总线来将命令堆栈创建的事件分派到查询堆栈非规范化器。
让我们假设在我们的域中有一个聚合,我们称之为 Photo,以及两个不同的域事件:PhotoUploaded 和 PhotoArchived.
在这种情况下,我们有两种不同的消息类型,默认的 Mass Transit 行为是创建两种不同的 RabbitMq 交换:一种用于 PhotoUploaded 消息类型,另一种用于 PhotoArchived 消息类型。
让我们假设有一个名为 PhotoDenormalizer 的反规范化器:此服务将成为两种消息类型的消费者,因为照片读取模型必须在上传或上传照片时更新存档。
给定默认的 Mass Transit 拓扑,将有两个不同的交换,因此无法保证不同类型事件之间的消息处理顺序:我们拥有的唯一保证是将处理所有相同类型的事件顺序,但我们不能保证不同类型事件之间的处理顺序(请注意,鉴于我的示例的事件语义,处理顺序很重要)。
我该如何处理这种情况? Mass Transit 是否适合我的需要?我是否完全忽略了域事件调度的要点?
免责声明:这不是您问题的答案,而是一条预防性信息,说明您为什么不应该做您计划的事情要做。
虽然像 RMQ 这样的消息代理和像 MassTransit 这样的消息中间件库非常适合集成,但我强烈建议不要使用消息代理进行事件溯源。我可以参考我的旧答案 Event-sourcing: when (and not) should I use Message Queue? 解释其背后的原因。
您发现自己的原因之一 - 活动顺序永远无法保证。
另一个明显的原因是,从通过消息代理发布的事件构建读取模型有效地消除了重放的可能性,并构建了需要从一开始就开始处理事件的新读取模型,但所有这些get 是正在发布的事件现在。
聚合形成事务边界,因此每个命令都需要保证它在一个事务内完成。虽然 MT 支持 transaction middleware,但它仅保证您获得支持它们的依赖项的事务,但不保证消费者主体中的 context.Publish(@event)
,因为 RMQ 不支持事务。您很有可能提交更改而不是在读取端获取事件。因此,事件存储的经验法则是您应该能够从存储 订阅更改流 ,而不是从您的代码发布事件,除非这些是集成事件而不是领域事件。
对于事件溯源,至关重要的是每个读取模型在其投影的事件流中保留自己的检查点。消息代理不会给你那种权力,因为 "checkpoint" 实际上是你的队列,一旦消息从队列中消失 - 它就永远消失了,不会再回来了。
关于实际问题:
您可以使用 message topology configuration 为不同的消息设置相同的实体名称,然后它们将被发布到同一个交换器,但这属于 "abuse" 类别,就像 Chris 在那个页面。我没试过,但你绝对可以尝试。消息 CLR 类型是元数据的一部分,因此不应该存在反序列化问题。
但同样,将消息放在同一个交换器中不会给您任何排序保证,除了所有消息都将进入一个队列以供消费服务这一事实。
您至少必须根据聚合 ID 设置分区过滤器,以防止并行处理同一聚合的多条消息。顺便说一下,这对于集成也很有用。我们就是这样做的:
void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
=> ep.Handler<T>(
c => appService.Handle(c, aggregateStore),
hc => hc.UsePartitioner(8, partition));
AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);