Axon Saga 订阅模式并发问题
Axon Saga subscribing mode concurrency issue
我在使用 Axon 4.3.5 的 Saga 中从跟踪模式切换到订阅模式时发现了意外行为
看来,在订阅模式下,当两个线程同时到达两个@StarSaga方法时,会为同一个关联键值创建两个sagas。
我错过了什么吗?
我有这个来重现它:
@Saga
@ProcessingGroup("Saga")
public class RaceSaga {
@Inject
protected transient CommandGateway commandGateway;
@StartSaga
@SagaEventHandler(associationProperty = "executionId")
public void on(Exec exec) {
commandGateway.sendAndWait(new CreateExecCommand(exec.getExecutionId(), exec.getDescription()));
}
@StartSaga
@SagaEventHandler(associationProperty = "executionId")
public void on(Risk risk) {
commandGateway.sendAndWait(new CreateRiskCommand(risk.getExecutionId(), risk.getResult()));
}
}
@IntegrationTest
class RaceConditionTest extends BaseIntegrationTest {
@Autowired
private EventGateway eventGateway;
@Autowired
private SagaStore sagaStore;
@Test
void sagaRace() {
var execId = UUID.randomUUID();
CompletableFuture.runAsync(() -> eventGateway.publish(new Exec(execId.toString(), "desc")));
CompletableFuture.runAsync(() -> eventGateway.publish(new Risk(execId.toString(), "OK")));
var association = new AssociationValue("executionId", execId.toString());
await().during(5, SECONDS)
.untilAsserted(() -> assertThat(sagaStore.findSagas(RaceSaga.class, association))
.hasSize(1));
}
}
使用跟踪模式时测试通过但订阅失败。 (yml 配置)
老实说,这是测试设置的预期行为,但需要一些解释。
知道下面是订阅(SEP)和跟踪事件处理器(TEP)的主要区别:
SubscribingEventProcessor
- 在 EventBus
上发布事件的线程中调用,类似于推送机制。
TrackingEventProcessor
- 在单独的线程中调用,从 EventStore
检索事件,类似于拉动机制。
这确保了无论事件的并发发布方式如何,TEP 都将确保事件处理顺序。
谈到 SEP,情况略有不同,为此我们需要稍微深入研究一下实现。您可以假设两个或多个事件的发布并不过分奇怪。如果领域内有正确的要求,很多聚合实现都会这样做。该框架有一种方法可以将多个事件的这些交易分组到一个批次中。为此,它使用 UnitOfWork
。例如,如果您要输入聚合的命令处理函数,则确保 UnitOfWork
处于活动状态以协调生命周期。其中一项任务是将事件配对成一批以供发布。
在您的测试用例中,您直接使用 EventGateway
。本质上完全没问题,但测试是在没有启动 UnitOfWork
的情况下设置的,以协调这两个事件按顺序发生。深入研究代码以查看发布到 SEP 的工作原理,您将在这个阶段进入 AbstractEventProcessor
。执行验证以检查调用 EventProcessor#publish(List<EventMessage>)
时 UnitOfWork
是否处于活动状态。如果是,则将事件添加到 UnitOfWork
.
的右侧阶段
当没有 UnitOfWork
(UoW) 处于活动状态时,将立即调用处理程序。
所以,当使用TrackingEventProcessor
时,框架会有意识地启动一个UoW来将事件批量处理,以便按顺序处理。当使用 SubscribingEventProcessor
时,这项工作留给用户,假设用户通常会通过 [命令处理 -> 事件发布 -> 事件处理] 的常规流程,这将确保 UoW 处于活动状态.由于在您的集成测试中不是这种情况,因此两个发布操作将立即调用 RaceSaga
的 SagaManager
,由于并发性而创建两个实例。
请注意,建议对这些过程使用 TEP。为 Saga 使用 SEP 可能意味着您将在应用程序(错误)关闭期间丢失一些事件。由于 SEP 是一种推送机制,因此无法从这些“丢失的”(从您的事件处理器的角度来看)事件中恢复。 TEP 将解决此问题,因为它会自行处理事件并跟踪流程。
相信这一点可以为你澄清事情@matpiera。
我在使用 Axon 4.3.5 的 Saga 中从跟踪模式切换到订阅模式时发现了意外行为
看来,在订阅模式下,当两个线程同时到达两个@StarSaga方法时,会为同一个关联键值创建两个sagas。 我错过了什么吗?
我有这个来重现它:
@Saga
@ProcessingGroup("Saga")
public class RaceSaga {
@Inject
protected transient CommandGateway commandGateway;
@StartSaga
@SagaEventHandler(associationProperty = "executionId")
public void on(Exec exec) {
commandGateway.sendAndWait(new CreateExecCommand(exec.getExecutionId(), exec.getDescription()));
}
@StartSaga
@SagaEventHandler(associationProperty = "executionId")
public void on(Risk risk) {
commandGateway.sendAndWait(new CreateRiskCommand(risk.getExecutionId(), risk.getResult()));
}
}
@IntegrationTest
class RaceConditionTest extends BaseIntegrationTest {
@Autowired
private EventGateway eventGateway;
@Autowired
private SagaStore sagaStore;
@Test
void sagaRace() {
var execId = UUID.randomUUID();
CompletableFuture.runAsync(() -> eventGateway.publish(new Exec(execId.toString(), "desc")));
CompletableFuture.runAsync(() -> eventGateway.publish(new Risk(execId.toString(), "OK")));
var association = new AssociationValue("executionId", execId.toString());
await().during(5, SECONDS)
.untilAsserted(() -> assertThat(sagaStore.findSagas(RaceSaga.class, association))
.hasSize(1));
}
}
使用跟踪模式时测试通过但订阅失败。 (yml 配置)
老实说,这是测试设置的预期行为,但需要一些解释。
知道下面是订阅(SEP)和跟踪事件处理器(TEP)的主要区别:
SubscribingEventProcessor
- 在EventBus
上发布事件的线程中调用,类似于推送机制。TrackingEventProcessor
- 在单独的线程中调用,从EventStore
检索事件,类似于拉动机制。
这确保了无论事件的并发发布方式如何,TEP 都将确保事件处理顺序。
谈到 SEP,情况略有不同,为此我们需要稍微深入研究一下实现。您可以假设两个或多个事件的发布并不过分奇怪。如果领域内有正确的要求,很多聚合实现都会这样做。该框架有一种方法可以将多个事件的这些交易分组到一个批次中。为此,它使用 UnitOfWork
。例如,如果您要输入聚合的命令处理函数,则确保 UnitOfWork
处于活动状态以协调生命周期。其中一项任务是将事件配对成一批以供发布。
在您的测试用例中,您直接使用 EventGateway
。本质上完全没问题,但测试是在没有启动 UnitOfWork
的情况下设置的,以协调这两个事件按顺序发生。深入研究代码以查看发布到 SEP 的工作原理,您将在这个阶段进入 AbstractEventProcessor
。执行验证以检查调用 EventProcessor#publish(List<EventMessage>)
时 UnitOfWork
是否处于活动状态。如果是,则将事件添加到 UnitOfWork
.
当没有 UnitOfWork
(UoW) 处于活动状态时,将立即调用处理程序。
所以,当使用TrackingEventProcessor
时,框架会有意识地启动一个UoW来将事件批量处理,以便按顺序处理。当使用 SubscribingEventProcessor
时,这项工作留给用户,假设用户通常会通过 [命令处理 -> 事件发布 -> 事件处理] 的常规流程,这将确保 UoW 处于活动状态.由于在您的集成测试中不是这种情况,因此两个发布操作将立即调用 RaceSaga
的 SagaManager
,由于并发性而创建两个实例。
请注意,建议对这些过程使用 TEP。为 Saga 使用 SEP 可能意味着您将在应用程序(错误)关闭期间丢失一些事件。由于 SEP 是一种推送机制,因此无法从这些“丢失的”(从您的事件处理器的角度来看)事件中恢复。 TEP 将解决此问题,因为它会自行处理事件并跟踪流程。
相信这一点可以为你澄清事情@matpiera。