防止重复 AggregateCreated 事件的最佳实践
Best practice for preventing duplicate AggregateCreated events
我有以下 (Axon) 聚合体:
@Aggregate
@NoArgsConstructor
public class Car{
@AggregateIdentifier
private String id;
@CommandHandler
public Car(CreateCar command){
apply( new CarCreated(command.getId()) );
}
@EventSourcingHandler
public void carCreated(CarCreated event) {
this.id = event.getId();
}
}
我可以通过提交具有特定 ID 的 CreateCar
命令来创建汽车,从而引发 CarCreated
事件。太棒了
但是,如果我发送另一个具有相同 ID 的 CreateCar
命令,则聚合无法验证该命令(给定的 ID 已经存在)。随后它将简单地触发一个新的 CarCreated
事件。这是一个谎言。
如果汽车已经存在,确保 CreateCar
命令失败的最佳方法是什么?
当然我可以先检查存储库,但这不会阻止竞争条件...
What would be the best approach to make sure the CreateCar command fails if the car already exists? Naturally I could first check the repository, but this won't prevent race conditions...
没有魔法。
如果您要避免不正常的写入,那么您需要获取对数据存储的锁定,或者您需要具有 compare and swap
语义的数据存储。
使用锁,您可以保证在您对存储中数据的读取与后续写入之间不会发生冲突更新。
lock = lock_for_id id
lock.acquire
Try:
Option[Car] root = repository.load id
switch root {
case None:
Car car = createCar ...
repository.store car
case Some(car):
// deal with the fact that the car has already been created
}
Finally:
lock.release
您想为每个聚合都设置一个锁,但创建锁与创建聚合具有相同的竞争条件。所以你最终可能会得到类似 coarse grained lock 的东西来限制对操作的访问。
通过比较和交换,您可以将争用管理推向数据存储。而不是向商店发送 PUT, you are sending a conditional PUT.
Option[Car] root = repository.load id
switch root {
case None:
Car car = createCar ...
repository.replace car None
case Some(car):
// deal with the fact that the car has already been created
}
我们不再需要锁,因为我们正在为存储精确描述需要满足的前提条件(例如If-None-Match: *)。
事件存储通常支持比较和交换语义;将新事件“附加”到流上是通过制作一个查询来完成的,该查询标识尾指针的预期位置,并使用特殊编码的值来标识预期创建流的情况(例如,事件存储支持 ExpectedVersion.NoStream语义)。
However, if I send another CreateCar command, with the same Id, the command cannot be validated by the aggregate (that the given id already exists). Subsequently it will simply fire a new CarCreated event. Which is a lie.
Axon 实际上会为您解决这个问题。当聚合发布事件时,它不会立即发布到其他组件。它在工作单元中暂存,等待处理程序执行完成。
处理程序执行后,将调用许多 "prepare commit" 个处理程序。其中一个存储聚合(在使用事件溯源时是空操作),另一个存储事件的发布(在事务范围内)。
根据您是否使用事件溯源,将聚合实例添加到持久存储将失败(重复键),或者创建事件的发布将失败(重复聚合标识符+序列号)。
我有以下 (Axon) 聚合体:
@Aggregate
@NoArgsConstructor
public class Car{
@AggregateIdentifier
private String id;
@CommandHandler
public Car(CreateCar command){
apply( new CarCreated(command.getId()) );
}
@EventSourcingHandler
public void carCreated(CarCreated event) {
this.id = event.getId();
}
}
我可以通过提交具有特定 ID 的 CreateCar
命令来创建汽车,从而引发 CarCreated
事件。太棒了
但是,如果我发送另一个具有相同 ID 的 CreateCar
命令,则聚合无法验证该命令(给定的 ID 已经存在)。随后它将简单地触发一个新的 CarCreated
事件。这是一个谎言。
如果汽车已经存在,确保 CreateCar
命令失败的最佳方法是什么?
当然我可以先检查存储库,但这不会阻止竞争条件...
What would be the best approach to make sure the CreateCar command fails if the car already exists? Naturally I could first check the repository, but this won't prevent race conditions...
没有魔法。
如果您要避免不正常的写入,那么您需要获取对数据存储的锁定,或者您需要具有 compare and swap
语义的数据存储。
使用锁,您可以保证在您对存储中数据的读取与后续写入之间不会发生冲突更新。
lock = lock_for_id id
lock.acquire
Try:
Option[Car] root = repository.load id
switch root {
case None:
Car car = createCar ...
repository.store car
case Some(car):
// deal with the fact that the car has already been created
}
Finally:
lock.release
您想为每个聚合都设置一个锁,但创建锁与创建聚合具有相同的竞争条件。所以你最终可能会得到类似 coarse grained lock 的东西来限制对操作的访问。
通过比较和交换,您可以将争用管理推向数据存储。而不是向商店发送 PUT, you are sending a conditional PUT.
Option[Car] root = repository.load id
switch root {
case None:
Car car = createCar ...
repository.replace car None
case Some(car):
// deal with the fact that the car has already been created
}
我们不再需要锁,因为我们正在为存储精确描述需要满足的前提条件(例如If-None-Match: *)。
事件存储通常支持比较和交换语义;将新事件“附加”到流上是通过制作一个查询来完成的,该查询标识尾指针的预期位置,并使用特殊编码的值来标识预期创建流的情况(例如,事件存储支持 ExpectedVersion.NoStream语义)。
However, if I send another CreateCar command, with the same Id, the command cannot be validated by the aggregate (that the given id already exists). Subsequently it will simply fire a new CarCreated event. Which is a lie.
Axon 实际上会为您解决这个问题。当聚合发布事件时,它不会立即发布到其他组件。它在工作单元中暂存,等待处理程序执行完成。 处理程序执行后,将调用许多 "prepare commit" 个处理程序。其中一个存储聚合(在使用事件溯源时是空操作),另一个存储事件的发布(在事务范围内)。
根据您是否使用事件溯源,将聚合实例添加到持久存储将失败(重复键),或者创建事件的发布将失败(重复聚合标识符+序列号)。