在 DDD 的上下文中应该如何处理事件订阅和分发
How should event subscription and dispatching be handled in the context of DDD
据我了解,DDD 中有两种类型的事件。 运行同进程下跨多进程。
事件订阅和分发应该如何处理?
我将提供下订单的示例。当您下订单(第一个有界上下文)时,您必须更新库存(第二个 BC)。
因此,在 OrderAggregate 中,您会有一些创建订单的方法,最后它会添加一个事件,例如 OrderPlacedEvent,到 Domain.Events 的列表中。
现在假设在第一个限界上下文中您将发送一封确认电子邮件。谁应该通知 SendEmailEventHandler 实际发送电子邮件?
那么谁负责将事件发送给所有相关方,在本例中为 SendEmailEventHandler?
此外,由于事务应该扩展到第二个限界上下文,谁应该派发事件?我知道一个消息总线,但如果你没有?
我现在的设计方式是,在我的 Domain.Aggregate 中,我有一个 AddEvent(IDomainEvent event) 方法,该方法由 AggregateRoot class(基础 class对于域聚合)。
当聚合做某事时,它会将来自特定操作的事件添加到该列表中。
StartOrder() => OrderAggregate 方法
OrderStartedEvent() => 域事件
在 Application.Layer 中,我有一个 UseCaseHandler(操场),我在其中调用聚合方法和存储库(仅此而已)。
在 Infrastructure 中,我有一个存储库,它抽象了 ORM,只提供了添加、更新和删除的方法 + 一个通过它的 id 获取聚合的方法。
根据我当前的设计,存储库是在提交之前分派事件的存储库。
这就是 repo 的样子
protected override async Task AddAsync(TAggregate aggregate, CancellationToken token = default(CancellationToken))
{
using (ITransaction transaction = Session.BeginTransaction())
{
await Session.SaveAsync(aggregate, aggregate.Id, token);
/// => dispatch domain.Event to all interested parties
try
{
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
第一个疑问:现在这部分代表调度,我不太确定。存储库是否应该负责调度,从我的角度来看,它应该但我仍然不确定?
这就是应用程序 UseCaseHandler 的样子
private IOrderRepository _orderRepository;
public async Task HandleAsync(CreateOrderRequest request, CancellationToken? cancellationToken = null)
{
OrderAggregate aggregate = await _orderRepository.GetByIdAsync(request.Id);
aggregate.CreateDraftOrder(); /// => adds the OrderPlacedEvent()
await _orderRepository.CreateOrderAsync(aggregate);
}
第二个未知数:更重要的是在哪里和谁处理订阅?
您想查看 Udi Dahan 的 Reliable Messaging with Distributed Transactions。
总结:如果您需要跨边界可靠地传递消息,那么您需要设计您的系统,以便随着对域模型的更改持久存储出站消息,您需要重试管道以发送所有未收到的消息''t been acknowledged, 你需要在消息上有清晰的身份语义,以便消费者可以识别重复的消息,并且你需要消费者在收到消息的第二个副本时做明智的事情。
另请参阅:Life Beyond Distributed Transactions,作者:Pat Helland。
据我了解,DDD 中有两种类型的事件。 运行同进程下跨多进程。
事件订阅和分发应该如何处理?
我将提供下订单的示例。当您下订单(第一个有界上下文)时,您必须更新库存(第二个 BC)。
因此,在 OrderAggregate 中,您会有一些创建订单的方法,最后它会添加一个事件,例如 OrderPlacedEvent,到 Domain.Events 的列表中。
现在假设在第一个限界上下文中您将发送一封确认电子邮件。谁应该通知 SendEmailEventHandler 实际发送电子邮件?
那么谁负责将事件发送给所有相关方,在本例中为 SendEmailEventHandler?
此外,由于事务应该扩展到第二个限界上下文,谁应该派发事件?我知道一个消息总线,但如果你没有?
我现在的设计方式是,在我的 Domain.Aggregate 中,我有一个 AddEvent(IDomainEvent event) 方法,该方法由 AggregateRoot class(基础 class对于域聚合)。
当聚合做某事时,它会将来自特定操作的事件添加到该列表中。
StartOrder() => OrderAggregate 方法
OrderStartedEvent() => 域事件
在 Application.Layer 中,我有一个 UseCaseHandler(操场),我在其中调用聚合方法和存储库(仅此而已)。
在 Infrastructure 中,我有一个存储库,它抽象了 ORM,只提供了添加、更新和删除的方法 + 一个通过它的 id 获取聚合的方法。
根据我当前的设计,存储库是在提交之前分派事件的存储库。
这就是 repo 的样子
protected override async Task AddAsync(TAggregate aggregate, CancellationToken token = default(CancellationToken))
{
using (ITransaction transaction = Session.BeginTransaction())
{
await Session.SaveAsync(aggregate, aggregate.Id, token);
/// => dispatch domain.Event to all interested parties
try
{
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
第一个疑问:现在这部分代表调度,我不太确定。存储库是否应该负责调度,从我的角度来看,它应该但我仍然不确定?
这就是应用程序 UseCaseHandler 的样子
private IOrderRepository _orderRepository;
public async Task HandleAsync(CreateOrderRequest request, CancellationToken? cancellationToken = null)
{
OrderAggregate aggregate = await _orderRepository.GetByIdAsync(request.Id);
aggregate.CreateDraftOrder(); /// => adds the OrderPlacedEvent()
await _orderRepository.CreateOrderAsync(aggregate);
}
第二个未知数:更重要的是在哪里和谁处理订阅?
您想查看 Udi Dahan 的 Reliable Messaging with Distributed Transactions。
总结:如果您需要跨边界可靠地传递消息,那么您需要设计您的系统,以便随着对域模型的更改持久存储出站消息,您需要重试管道以发送所有未收到的消息''t been acknowledged, 你需要在消息上有清晰的身份语义,以便消费者可以识别重复的消息,并且你需要消费者在收到消息的第二个副本时做明智的事情。
另请参阅:Life Beyond Distributed Transactions,作者:Pat Helland。