值对象作为@AggregateIdentifier 和@TargetAggregateIdentifier

Value Object as @AggregateIdentifier and @TargetAggregateIdentifier

首先我很抱歉post。有相当多的代码需要展示以详细了解问题,因此丢失了 post 的东西......请阅读所有代码:-)

我正在尝试使用 Axon 框架和 Spring 引导应用程序开发基于事件源的应用程序。我在下面提供了一些 class 定义来构成聚合、命令和事件实现。

我写了一个非常简单的测试应用程序 (Spring),它只是向 Axon 发送一个 CreateAppointmentCommand。这个创建命令使用一个手动分配的 AppointmentId(这是一个 subclass 一个 AbstractId)和 returns 这个 ID 供以后使用。没有错,Appointment 中的构造函数命令处理程序按预期调用,相应的 ApppointmentCreatedEvent 被触发并按 Appoitment class 的预期进行处理。到目前为止,一切都很好。 当我发送带有创建命令返回的 ID 的 ConfirmAppointmentCommand 时,问题就出现了。在这些情况下,我收到一条错误消息:

Command 'ConfirmAppointmentCommand' resulted in org.axonframework.commandhandling.CommandExecutionException(Provided id of the wrong type for class Appointment. Expected: class AppointmentId, got class java.lang.String)

我不明白此设置中与此错误消息相关的一些事情:

  1. 与确认 command/event 相比,为什么创建命令和事件使用相同的方法(至少到目前为止我的理解是这样)却能按预期工作?
  2. 为什么 Axon 抱怨 AppointmentId 作为(可能是聚合)的标识符,而相应的代码(见下文)为 @AggregateIdentier 和 @TargetAggregateIdentier 注释了两种字符串类型?
  3. 我是否可以使用聚合和实体的相同代码将聚合直接存储到持久存储(在本例中为由 Spring 管理并链接到关系数据库的 JPA 存储库)同时被 Axon 使用(我认为我不应该使用参考指南中描述的状态存储聚合方法,因为我仍然希望我的解决方案是事件驱动的以创建和更新约会)?
  4. 这是使用事件机制使聚合状态保持最新的正确方法吗?是否可以使用不同的 Spring @Component class 实现一系列@EventHandler 方法对关系数据库进行 CRUD 操作。在后者中,创建事件按预期处理,约会存储在数据库中。由于之前的错误消息,确认事件未被触发。
  5. 关于第 4 项,我对如果 Axon 重新启动并开始向第 4 项中的事件处理程序发出不同的事件会发生什么感到有点困惑。这不会导致很多数据库错误,因为约会是仍然在数据库中,或者在最坏的情况下,相同约会的无限重复?换句话说,我在这个项目中使用的方法以及我对事件驱动的理解似乎有问题 applications/services.

请查看下面不同的 class 定义以获取更多详细信息。首先,我有根聚合约会,它将同时用作 JPA 实体。

@Aggregate
@Entity
@Table(name = "t_appointment")
public final class Appointment extends AbstractEntity<AppointmentId> {

    //JPA annotated class members left out for brevity

    @PersistenceConstructor
    private Appointment() {
        super(null);
        //Sets all remaining class members to null.
    }

    @CommandHandler
    private Appointment(CreateAppointmentCommand command) {
        super(command.getAggregateId());
        validateFields(getEntityId(), ...);
        AggregateLifecycle.apply(new AppointmentCreatedEvent(getEntityId(), ...);
    }

    @EventSourcingHandler
    private void on(AppointmentCreatedEvent event) {
        validateFields(event.getAggregateId(), ...);
        initFields(event.getAggregateId(), ...);
    }

    private void validateFields(AppointmentId appointmentId, ...) {
        //Check if all arguments are within the required boundaries.
    }

    private void initFields(AppointmentId appointmentId, ...) {
        //Set all class level variables to passed in value.
    }

    @CommandHandler
    private void handle(ConfirmAppointmentCommand command) {
        AggregateLifecycle.apply(new AppointmentConfirmedEvent(command.getAggregateId()));
    }

    @EventSourcingHandler
    private void on(AppointmentConfirmedEvent event) {
        confirm();
    }

    public void confirm() {
        changeState(State.CONFIRMED);
    }   

    //Similar state changing command/event handlers left out for brevity.

    private void changeState(State newState) {
        switch (state) {
        ...
        }
    }

    //All getter methods left out for brevity. The aggregate does NOT provide any setters.

    @Override
    public String toString() {
        return "Appointment [...]";
    }
}

AbstractEntity class 是所有 JPA 实体和聚合的基础 class。此 class 具有以下定义。

@MappedSuperclass
@SuppressWarnings("serial")
public abstract class AbstractEntity<ENTITY_ID extends AbstractId> implements Serializable{

    @EmbeddedId
    private ENTITY_ID entityId;

    @AggregateIdentifier
    private String targetId;


    protected AbstractEntity(ENTITY_ID id) {
        this.LOG = LogManager.getLogger(getClass());
        this.entityId = id;
        this.targetId = id != null ? id.getId() : null;
    }

    public final ENTITY_ID getEntityId() {
        return entityId;
    }

    @Override
    public final int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((entityId == null) ? 0 : entityId.hashCode());
        return result;
    }

    @Override
    public final boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        AbstractEntity<?> other = (AbstractEntity<?>) obj;
        if (entityId == null) {
            if (other.entityId != null)
                return false;
        } else if (!entityId.equals(other.entityId))
            return false;
        return true;
    }
}

entityId(将用作 JPA 实体的主键)是一个 'complex' 值对象,具有以下基本 class 定义。

@MappedSuperclass
@SuppressWarnings("serial")
public abstract class AbstractId implements Serializable{

    @Column(name = "id")
    private String id;


    protected AbstractId() {
        this.id = UUID.randomUUID().toString();
    }

    public final String getId() {
        return id;
    }

    @Override
    public final int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((id == null) ? 0 : id.hashCode());
        return result;
    }

    @Override
    public final boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        AbstractId other = (AbstractId) obj;
        if (id == null) {
            if (other.id != null)
                return false;
        } else if (!id.equals(other.id))
            return false;
        return true;
    }

    public final String toString() {
        return id;
    }
}

在聚合中,使用了许多命令和事件。每个命令都是 Command.

的子class
@SuppressWarnings("serial")
public abstract class Command<AGGREGATE_ID extends AbstractId> implements Serializable{

    private AGGREGATE_ID aggregateId;

    @TargetAggregateIdentifier
    private String targetId;

    protected Command(AGGREGATE_ID aggregateId) {
        if(aggregateId == null) {
            throw new InvalidArgumentException(...);
        }   
        this.aggregateId = aggregateId;
        this.targetId = aggregateId != null ? aggregateId.getId() : null;
    }

    public final AGGREGATE_ID getAggregateId() {
        return aggregateId;
    }   
}

指定命令class(这在我的方法中造成困难)是 ConfirmAppointmentCommand,它实际上只不过是基本命令 class 的具体实现。因此,实施非常简单。

public final class ConfirmAppointmentCommand extends Command<AppointmentId> {
    private static final long serialVersionUID = 6618106729289153342L;

    public ConfirmAppointmentCommand(AppointmentId appointmentId) {
        super(appointmentId);       
    }   
}

CreateAppointmentCommand与ConfirmAppointmentCommand非常相似,定义如下。

public final class CreateAppointmentCommand extends Command<AppointmentId> {
    private static final long serialVersionUID = -5445719522854349344L;

    //Some additional class members left out for brevity.

    public CreateAppointmentCommand(AppointmentId appointmentId, ...) {
        super(appointmentId);

        //Check to verify the provided method arguments are left out.

        //Set all verified class members to the corresponding values.
    }

    //Getters for all class members, no setters are being implemented.

}

对于项目中使用的不同事件,使用了类似的方法。所有事件都子class 一个基本的 DomainEvent class,定义如下。

    @SuppressWarnings("serial")
    public abstract class DomainEvent<T extends AbstractId> implements Serializable{

        private T aggregateId;


        protected DomainEvent(T aggregateId) {
            if(aggregateId == null) {
                throw new InvalidArgumentException(ErrorCodes.AGGREGATE_ID_MISSING);
            }           
            this.aggregateId = aggregateId;
        }

        public final T getAggregateId() {
            return aggregateId;
        }   
    }

AppointmentCreatedEvent 非常简单。

public final class AppointmentCreatedEvent extends DomainEvent<AppointmentId> {
    private static final long serialVersionUID = -5265970306200850734L;

    //Class members left out for brevity

    public AppointmentCreatedEvent(AppointmentId appointmentId, ...) {
        super(appointmentId);

        //Check to verify the provided method arguments are left out.

        //Set all verified class members to the corresponding values.
    }

    //Getters for all class members, no setters are being implemented.
}

最后为了完整起见,AppointmentConfirmedEvent。

public final class AppointmentConfirmedEvent extends DomainEvent<AppointmentId> {
    private static final long serialVersionUID = 5415394808454635999L;

    public AppointmentConfirmedEvent(AppointmentId appointmentId) {
        super(appointmentId);       
    }
}

很少,你做到了 post 结束。首先谢谢你! 你能告诉我哪里出了问题或者我做错了什么吗?

此致, 库尔特

问题 3 从你的第三个问题中,我注意到你不想 使用 Axon 的状态存储聚合方法,而是使用事件溯源。另一方面,您 将聚合存储为状态对象,方法是将其设为实体。

你这样做的目的是什么?如果这是为了使用 Appointment 返回给感兴趣的各方,那么你应该知道你没有在这件事上遵循 CQRS。

Axon 中的 @Aggregate 注释 class 通常指向命令模型。因此,它纯粹用于接收命令,决定是否可以执行该命令的意图表达,并因此发布事件。

已添加,您声明将其放入 Spring 引导应用程序中。 从那里我假设你也在使用 axon-spring-boot-starter 依赖项。当使用 Axon 的 Spring 自动配置时,@Aggregate 用作 "Spring Stereotype"。最重要的是,如果 @Aggregate 注释对象是 注释 @Entity,则自动配置假定您希望按原样存储聚合。因此,它将默认拥有一个状态存储聚合;你说的不是你想要的。

问题 1 和 2 创建命令可能有效,因为这是聚合的起始点。因此,它还没有检索基于标识符的现有格式。

其次,您收到的异常尽管包装在 CommandExecutionException 中,但最初可能来自您的数据库。在 Axon 的代码库中快速搜索文本 Provided id of the wrong type for class 没有任何结果。 请注意,Axon 将假定 ID 始终可转换为 String。因此,专用的 toString() 方法可能有助于不将不需要的信息附加到 String

这部分是 Allard 询问更多信息的地方,因为这可能与聚合本质上现在是状态存储的事实有关。因此,对于给定的聚合,GenericJpaRepository(这是 Axon 自动为您配置当前设置的这个 repo)使用的 JPA 实现中出现异常。

问题 4 和 5 通过 @EventSourcingHandler 注释方法更新聚合并在您的应用程序中有一个独特的 Spring 组件来处理更新预测的事件确实完全没问题。通过 Axon 执行 CQRS 时,我将其视为 "the way to go"。

你最后的顾虑需要我做一个假设。我猜您还没有针对正在使用的 Event Processor 配置任何特定内容。这意味着 Axon 将为您自动配置 TrackingEventProcessor。此实现所做的其中一件事是将 "how far it is with handling events in the event stream" 的进度存储在令牌中。这些标记应与您的预测一起存储,因为它们定义了您的预测在涉及整个事件流时的最新程度。

如果您注意到事件处理组件中的事件处理程序每​​次启动时都会被调用,这对我来说表明 token_entry table 要么不存在,要么正在运行每次启动时清除。

总结 这里有很多东西,绝对希望这能帮助你,库尔特! 如果有什么不清楚的地方,请评论我的回答;我会相应地更新我的回复。