Axon Saga 中执行信息的持久化

Persistence of execution information in Axon Saga

我们正在使用 Axon 框架来实现 Java 中的 Saga 模式。 Axon 使用两个表(ASSOCIATION_VALUE_ENTRYSAGA_ENTRY)来存储传奇的每个步骤之后的所有必要信息。并在过程结束时(如果正确,或者在错误的情况下,所有补偿都已执行),它会删除寄存器。

如果由于任何原因,在发生错误后无法执行补偿,我们可以根据存储的信息在失败的地方恢复执行。到这里为止一切正常

当我们想要提高流程的弹性时,问题就出现了,我们检查了如果服务在 saga 执行期间终止会发生什么。根据上述,我们期望执行的信息被持久化到表中,但它们是空的:该信息仅在由于补偿错误导致进程无法继续时出现(并且没有执行最终的删除操作) ).

分析 Axon 的 JpaSagaStore class 实现的源代码,与数据库的交互(插入、更新和删除)通过 flush[=74= 持久化] 而不是 提交 。全局提交在 AbstractUnitOfWork class 中管理(据我们了解)。这是我们有疑问的地方:

  • 根据文献,flush写入数据库,但寄存器处于READ_UNCOMMITED状态。在数据库中看到它们的唯一方法是激活 READ_UNCOMMITED 隔离级别,'dirty reads' 有问题,对吧?还会有任何额外的 consideration/issue 需要考虑吗?
  • Axon 是否有替代方案来确保 saga 寄存器的持久性?主要是如果我们无法激活 READ_UNCOMMITED 模式(由于内部政策)。

编辑:

总结了很多,都从这个方法开始

public void startSaga(SagaWorkflow sagaWorkflow, Serializable sagaInput) {
  StartSagaEvt startSagaEvt = StartSagaEvt.builder().sagaWorkflow(sagaWorkflow).sagaInput(sagaInput).build();

  eventBus.publish(GenericEventMessage.asEventMessage(startSagaEvt));
  }

其中:

  • eventBus是Axon的内部
  • sagaInput 只是一个 Serializable 具有一些输入值
  • SagaWorkflow是一个Serializable,它对整个saga flow进行建模,其主要属性是一个节点的LinkedList(saga的不同步骤,每个步骤可以有不同的逻辑)
  • StartSagaEvt 只是模拟发送到总线的事件的 POJO

在此之后,Axon 执行所有 'magic' 并最终到达内部代码: AnnotatedSagaRepository.doCreateInstance --> AnnotatedSagaRepository.storeSaga --> [...] --> JpaSagaStore.insertSaga

public void insertSaga(Class<?> sagaType, String sagaIdentifier, Object saga, Set<AssociationValue> associationValues) {
    EntityManager entityManager = entityManagerProvider.getEntityManager();
    
    AbstractSagaEntry<?> entry = createSagaEntry(saga, sagaIdentifier, serializer);
    entityManager.persist(entry);
    for (AssociationValue associationValue : associationValues) {
        storeAssociationValue(entityManager, sagaType, sagaIdentifier, associationValue);
    }
    if (logger.isDebugEnabled()) {
        logger.debug("Storing saga id {} as {}", sagaIdentifier, serializedSagaAsString(entry));
    }
    if (useExplicitFlush) {
        entityManager.flush();
    }
}

这同样适用于 updatedelete 阶段。据我所知,commit/rollback 的所有句柄都在 class AbstractUnitOfWork 中执行,它恰好在整个传奇流程结束时进行干预。

这让我想到以下 considerations/questions: 在整个过程中保持事务打开而不是在每个步骤之后提交有什么意义?如果由于任何原因该过程失败、宕机、无法访问数据库、...所有保存的信息都将丢失。

此行为一定有设计原因,但我看不到。或者也许有一个配置可以更改它(希望如此,尽管我对此表示怀疑)。

提前感谢您的任何评论!

编辑 2

实际上,我们将其用作一种状态机,其中传奇流程是一系列步骤,每个步骤都有一个动作和一个补偿,我们从一个跳到另一个直到到达“END”状态。

@Saga
class GenericSaga {

    private EventBus eventBus;

    private CustomCommandGateway commandGateway;
    
    [...]

    @StartSaga
    @SagaEventHandler(associationProperty = "sagaId")
    public void startStep(StartSagaEvt startSagaEvt) {
        // Initializes de GenericSaga and associate several properties with SagaLifecycle.associateWith(key, value);
        [...]
        // Transit to the next (first) step
        eventBus.publish(GenericEventMessage.asEventMessage(new StepSagaEvt(startSagaEvt)));
    }

    @SagaEventHandler(associationProperty = "sagaId")
    public void nextStep(StepSagaEvt stepSagaEvt) {
        // Identifies what is the next step in the defined flow, considering if it should be executed sequentially or concurrently, or if it is the end of the flow and then call the SagaLifecycle.end()
        [...]
        // Also checks if it has to execute the compensation logic of the step
        [...]
        // Execute
        Serializable actionOutput = commandGateway.sendAndWaitEx(stepAction.getActionInput());
    }

    @SagaEventHandler(associationProperty = "sagaId")
    public void resumeSaga(ResumeSagaEvt resumeSagaEvt) {
        // Recover information from the execution that we want to resume
        [...]
        // Transit to the next step
        eventBus.publish(GenericEventMessage.asEventMessage(new StepSagaEvt(resumeSagaEvt)));
    }

}

如您所见,我们没有 endSaga 注释,也许这就是问题所在。但在我们目前的情况下,我们已经向前推进,并且已经定义了 JpaSagaStore 的自定义实现,以便在 insertSaga 和 updateSaga 方法中强制执行本地事务。

根据我的理解,我认为您在某种程度上滥用了 Axon Framework 中的 Saga 组件。根据您的问题,我假设您正在尝试使用自己的 SagaWorkflow 对象构建 'state machine' 的形式。如果是这样的话,我不得不说这不是 Axon 打算使用 Sagas 的方式。

除此之外,让我给你一个 Saga 应该是什么样子的伪样本。

@Saga
class SagaWorkflow {

    private transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "yourProperty")
    public void on(SagaInputEvent event) {
        // validate, associate with another property and fire a command
        SagaLifecycle.associateWith("associationPropertyKey", "associationPropertyValue");
        commandGateway.send(new GivenCommand());
    }

    @SagaEventHandler(associationProperty = "associationPropertyValue")
    public void on(AnotherEvent event) {
        // validate and fire a command or finish the saga
        SagaLifecycle.end();
    }

    @EndSaga
    @SagaEventHandler(associationProperty = "anyProperty")
    public void on(FinishSagaEvent event) {
        // check if you need to fire extra commands to tell others it's finished or just do it silently
    }

}
  • @Saga 注释将确保 Axon Framework 为您处理整个 Saga 过程,并在每个 (Saga)EventHandler 执行时将其存储(序列化)到数据库中
  • @SagaEventHandler 将确保 'Event Handling method' 对给定事件作出反应, 如果它包含 associationProperty 作为事件的一部分(为了更好地理解它,我将分享我们的 docs link)
  • @EndSaga 会告诉 Axon Framework 在方法执行后完成 Saga(完成意味着从数据库中删除它)
  • SagaLifecycle 提供了几个 'utilities' 方法来与 Saga 的生命周期和关联进行交互
  • 在示例中,我将 CommandGateway 设为瞬态,因为 Saga 已序列化并存储在数据库中。您也不会使用 Axon 序列化任何外部组件,例如网关

当然,还有更多。 您可以查看 Axon 的 docs。但我希望这能为您提供足够的 material 和想法,以便在 Axon Framework 中更好地使用 Sagas!

韩国