Axon Saga 订阅模式并发问题

Axon Saga subscribing mode concurrency issue

我在使用 Axon 4.3.5 的 Saga 中从跟踪模式切换到订阅模式时发现了意外行为

看来,在订阅模式下,当两个线程同时到达两个@StarSaga方法时,会为同一个关联键值创建两个sagas。 我错过了什么吗?

我有这个来重现它:

@Saga
@ProcessingGroup("Saga")
public class RaceSaga {

    @Inject
    protected transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Exec exec) {
        commandGateway.sendAndWait(new CreateExecCommand(exec.getExecutionId(), exec.getDescription()));
    }

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Risk risk) {
        commandGateway.sendAndWait(new CreateRiskCommand(risk.getExecutionId(), risk.getResult()));
    }
}

@IntegrationTest
class RaceConditionTest extends BaseIntegrationTest {

    @Autowired
    private EventGateway eventGateway;
    @Autowired
    private SagaStore sagaStore;

    @Test
    void sagaRace() {
        var execId = UUID.randomUUID();

        CompletableFuture.runAsync(() -> eventGateway.publish(new Exec(execId.toString(), "desc")));
        CompletableFuture.runAsync(() -> eventGateway.publish(new Risk(execId.toString(), "OK")));

        var association = new AssociationValue("executionId", execId.toString());
        await().during(5, SECONDS)
                .untilAsserted(() -> assertThat(sagaStore.findSagas(RaceSaga.class, association))
                        .hasSize(1));
    }
}

使用跟踪模式时测试通过但订阅失败。 (yml 配置)

老实说,这是测试设置的预期行为,但需要一些解释。

知道下面是订阅(SEP)和跟踪事件处理器(TEP)的主要区别:

  • SubscribingEventProcessor - 在 EventBus 上发布事件的线程中调用,类似于推送机制。
  • TrackingEventProcessor - 在单独的线程中调用,从 EventStore 检索事件,类似于拉动机制。

这确保了无论事件的并发发布方式如何,TEP 都将确保事件处理顺序。

谈到 SEP,情况略有不同,为此我们需要稍微深入研究一下实现。您可以假设两个或多个事件的发布并不过分奇怪。如果领域内有正确的要求,很多聚合实现都会这样做。该框架有一种方法可以将多个事件的这些交易分组到一个批次中。为此,它使用 UnitOfWork。例如,如果您要输入聚合的命令处理函数,则确保 UnitOfWork 处于活动状态以协调生命周期。其中一项任务是将事件配对成一批以供发布。

在您的测试用例中,您直接使用 EventGateway。本质上完全没问题,但测试是在没有启动 UnitOfWork 的情况下设置的,以协调这两个事件按顺序发生。深入研究代码以查看发布到 SEP 的工作原理,您将在这个阶段进入 AbstractEventProcessor。执行验证以检查调用 EventProcessor#publish(List<EventMessage>)UnitOfWork 是否处于活动状态。如果是,则将事件添加到 UnitOfWork.

的右侧阶段

当没有 UnitOfWork (UoW) 处于活动状态时,将立即调用处理程序。

所以,当使用TrackingEventProcessor时,框架会有意识地启动一个UoW来将事件批量处理,以便按顺序处理。当使用 SubscribingEventProcessor 时,这项工作留给用户,假设用户通常会通过 [命令处理 -> 事件发布 -> 事件处理] 的常规流程,这将确保 UoW 处于活动状态.由于在您的集成测试中不是这种情况,因此两个发布操作将立即调用 RaceSagaSagaManager,由于并发性而创建两个实例。

请注意,建议对这些过程使用 TEP。为 Saga 使用 SEP 可能意味着您将在应用程序(错误)关闭期间丢失一些事件。由于 SEP 是一种推送机制,因此无法从这些“丢失的”(从您的事件处理器的角度来看)事件中恢复。 TEP 将解决此问题,因为它会自行处理事件并跟踪流程。

相信这一点可以为你澄清事情@matpiera。