如何将 Axon4 中的 replay/project 事件指向不同的上下文?

How to replay/project events in Axon4 towards a different context?

我正在构建我的第一个事件源系统。它将有多个域,使用以发布生命周期为核心的项目。我如何有效地重播或将两个域的事件重新应用到第三个域内的新聚合?

更具体一点。想象 4 个领域,每个领域都有自己的限界上下文和目的。这些上下文的简短描述:

发布状态遵循生命周期:概念(尚未发布)> 宣布(可选)> 销售 > 售罄(发布结束)。在我的描述中,我专注于已宣布的状态。对于出版领域来说,概念实际上并不是一个薄薄的东西,因为如果出版还不知道一个项目,它总是处于概念中。


我的第一次尝试是设置一个处理传入事件的普通聚合 AnnouncementPublishedEvent。这需要项目满足一些基本要求,如 'it has a name'、'it has a description'、'it has at least one image' 等。这意味着我需要在应用事件之前验证此信息,因此我需要以某种方式在命令中提供一个 project 实例。

在执行此操作时,我怀疑此方法破坏了 CQRS 的目的,我应该查看真正的数据源:事件。我的下一次尝试是创建一个在事件 AnnouncementPublicationRequestedEvent 时开始的 Saga。此 saga 需要审查在给定 projectId 周围发生了哪些事件,并将这些事件应用于这个新的 'published project' 投影,以便(至少)验证请求是否可以被接受。

我对跟踪处理器进行了研究和试验,但无法找到在 Axon 版本 4 中如何完成的一个很好的例子。我还开始阅读有关 Whosebug 的其他几个问题,这让我觉得我可能需要重新考虑我的方法。


不幸的是,无法共享确切的代码,因为它不是开源的,即使我可以,它也远未达到工作状态。我可以使用示例代码来展示我正在尝试做什么。

@Saga
@ProcessingGroup("AnnouncementPublication")
public class AnnouncementPublicationSaga {

   private static int NUMBER_OF_ALLOWED_IMAGES

   private PublicationId publicationId;
   private ProjectId projectId;
   private int numberOfImages = 0;
   //...other fields

   @StartSaga
   @SagaEventHandler(associationProperty = "projectId")
   public void handle(AnnouncementPublicationRequestedEvent event) {
      publicationId = generatePublicationId();

      //set parameters from event for saga to use
      projectId = event.getProjectId();
      targetPublicationStatus = event.getPublicationStatus();
      date = event.getDate();

      //initialize the 'publicated project' aggregate
      //start a replay of associated events for this @ProcessingGroup
   }

   ...

   @SagaEventHandler(associationProperty = "projectId")
   public void handle(ProjectCreatedEvent event) {
      //Verify the project exists and has a valid name
   }

   ...

   /* Assumption* on how AssociationResolver works: */
   @SagaEventHandler(AssociationResolver=MediaProjectAssociator.class )
   public void handle(ProjectImageAdded event) {
      numberOfImages += 1;
   }

   /* Assumption* on how AssociationResolver works: */
   @SagaEventHandler(AssociationResolver=MediaProjectAssociator.class )
   public void handle(ProjectImageRemoved event) {
      numberOfImages -= 1;
   }

   ...

   /* In my head this should trigger if all events have been played
      up to the PublicationRequestedEvent. Or maybe 
   */
   @SagaEventHandler(associationProperty = "publicationId")
   public void handle(ValidationRequestCompleted event) {
      //ValidationResult result = ValidationResult.builder();
      ...
      if (numberOfImages > NUMBER_OF_ALLOWED_IMAGES) {
         //reason to trigger PublicationRequestDeniedEvent
         //update validationResult
      }
      ...
      if (validationResult.isAcceptable()) {
         //Trigger AnnouncementPublicationAcceptedEvent
      } else {
         //Trigger AnnouncementPublicationDeniedEvent
      }
   }

   ...

   @EndSaga
   @SagaEventHandler(associationProperty = "publicationId")
   public void handle(AnnouncementPublicationDeniedEvent event) {
      //do stuff to inform why the publication failed
   }

   @EndSaga
   @SagaEventHandler(associationProperty = "publicationId")
   public void handle(AnnouncementPublicationAcceptedEvent event){
      //do stuff to notify success to user
      //choice: delegate to delivery for actual sharing of data
      //    or  delivery itselfs listens for these events
   }
}

*associationResolver 代码是对其实际工作的假设,因为我还没有接近那部分。我的媒体上下文使用文件 ID 作为聚合标识符,因为并非每个事件都绑定到项目。但是这个传奇需要重播的所有媒体事件都将有一个 projectId 作为其中的字段。欢迎对此提出任何反馈,但这不是我现在的主要问题。


最后的结果应该是:发布记录或尝试记录以及失败原因。

出版物的记录包含与出版物相关的 projectmedia 事件的所有数据。这主要是潜在买家需要做出决定的信息。

出于这个问题的目的,我不希望上述问题得到彻底解决。我只是想知道我对事件的思考是否在正确的轨道上,我重播相关事件的方法是否正确,如果是这样,如何在 Axon4 中完成。

根据 Martin 的问题描述,我假设您有几个 distinct 限界上下文。遵循限界上下文的定义:

Explicitly define the context within which a model applies.

Explicitly set boundaries in terms of team organization, usage within specific parts of the application, and physical manifestations such as code bases and database schemas.

Keep the model strictly consistent within these bounds, but don’t be distracted or confused by issues outside.

由此我想强调的是,在给定的限界上下文中,您对任何组件都说相同的 language/API。 但是,在上下文之间,您将非常有意识地共享,使用专用的上下文映射(例如反腐败层)来确保另一个域不会进入您的域。

综上所述,事件是特定限界上下文的一部分。 因此,理想情况下,使用来自其他上下文的多个事件流到 recreate/replay 另一个上下文中的聚合应该是不可能的。

除此之外,在 Axon 中,聚合只能根据事件重新创建 已经发布了自己。

为了仍然得到一个解决方案,其中给定的应用程序从其他应用程序摄取事件以重新水化聚合,我将采取以下步骤:

  • 有一个专门的组件(例如反腐败层)将传入的事件转换为应用程序中不同形式的消息。
  • 如果这些事件应导致聚合的重建,则您需要将事件转换为命令。在谈论 CQRS 时,Axon 中的聚合基础结构组件适用于命令模型。
  • 所述聚合随后将处理命令、执行一些业务逻辑并作为结果发布一个(或多个)事件。
  • 从现在开始,框架将处理重播给定聚合的所有事件,前提是您遵循事件溯源实践来更新聚合的状态。

最后,我想指出,Axon 提供的与 TrackingEventProcessor 相关的重播的任何细节都用于 CQRS 应用程序查询端的事件处理。

希望这能为您澄清事情,马丁!如果没有,请随时在此答案下发表评论,我会相应地更新我的回复。