Axon Sagas 在将事件重播到新数据库时复制事件存储中的事件
Axon Sagas duplicates events in event store when replaying events to new DB
我们有存储新订单的 Axon 应用程序。对于每个订单状态更改 (OrderStateChangedEvent),它都会计划几个任务。任务由另一个 Saga 触发和继续(TaskSaga - 超出问题范围)
当我删除投影数据库,但离开事件存储,然后再次运行应用程序时,事件被重播(正确的是),但任务是重复的。
我想这是因为 OrderStateChangedEvent
每次都会触发一组新的 ScheduleTaskCommand
。
因为我是 Axon 的新手,不知道如何避免这种重复。
AxonServer 上的事件存储 运行ning
Spring 启动应用程序自动配置轴突
投影数据库包含投影表和轴突表:
token_entry
saga_entry
association_value_entry
我想所有事件都被重播了,因为通过重新创建数据库,Axon 表消失了(因此没有关于最后应用事件的记录)
我是不是漏掉了什么?
- token_entry/saga_entry/association_value_entry 表是否应该成为每个应用程序节点上投影表的数据库的一部分?
- 我认为事件存储可能会在不更改事件历史的情况下随时重播到新应用程序节点的数据库中,因此我可以 运行 任意数量的节点。或者我可以随时删除投影 dB 和 运行 应用程序,导致事件再次投影到新数据库的原因。或者这不是真的?
- 总的来说,我的问题是一个事件产生了导致产生新事件(重复)的命令。我应该避免这种 "chaining" 事件以避免重复吗?
谢谢!
轴突配置:
@Configuration
public class AxonConfig {
@Bean
public EventSourcingRepository<ApplicationAggregate> applicationEventSourcingRepository(EventStore eventStore) {
return EventSourcingRepository.builder(ApplicationAggregate.class)
.eventStore(eventStore)
.build();
}
@Bean
public SagaStore sagaStore(EntityManager entityManager) {
return JpaSagaStore.builder().entityManagerProvider(new SimpleEntityManagerProvider(entityManager)).build();
}
}
- 订单聚合收到的 CreateOrderCommand(方法 fromCommand 只是将 1:1 命令映射到事件)
@CommandHandler
public OrderAggregate(CreateOrderCommand cmd) {
apply(OrderCreatedEvent.fromCommand(cmd))
.andThenApply(() -> OrderStateChangedEvent.builder()
.applicationId(cmd.getOrderId())
.newState(OrderState.NEW)
.build());
}
- 订单聚合设置属性
@EventSourcingHandler
protected void on(OrderCreatedEvent event) {
id = event.getOrderId();
// ... additional properties set
}
@EventSourcingHandler
protected void on(OrderStateChangedEvent cmd) {
this.state = cmd.getNewState();
}
- OrderStateChangedEvent 被 Saga 侦听,为特定状态的顺序安排几个任务
private Map<String, TaskStatus> tasks = new HashMap<>();
private OrderState orderState;
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void on(OrderStateChangedEvent event) {
orderState = event.getNewState();
List<OrderStateAwareTaskDefinition> tasksByState = taskService.getTasksByState(orderState);
if (tasksByState.isEmpty()) {
finishSaga(event.getOrderId());
}
tasksByState.stream()
.map(task -> ScheduleTaskCommand.builder()
.orderId(event.getOrderId())
.taskId(IdentifierFactory.getInstance().generateIdentifier())
.targetState(orderState)
.taskName(task.getTaskName())
.build())
.peek(command -> tasks.put(command.getTaskId(), SCHEDULED))
.forEach(command -> commandGateway.send(command));
}
我想在这种情况下我可以帮助你。
因此,发生这种情况是因为 TrackingEventProcessor
使用的 TrackingToken
为您的 Saga 实例提供所有事件被初始化为事件流的开头。因此,TrackingEventProcessor
将从时间的开始开始,从而使您的所有命令都被第二次调度。
您可以采取一些措施来解决此问题。
- 您可以不擦除整个数据库,而只擦除投影 table 并保持令牌 table 完好无损。
- 您可以将
TrackingEventProcessor
的 initialTrackingToken
配置为从事件流的头部而不是尾部开始。
选项 1 可以找到,但需要从操作角度进行一些授权。选项 2 将其交由开发人员处理,可能比其他解决方案更安全。
要调整令牌从头开始,您可以实例化一个 TrackingEventProcessor
和一个 TrackingEventProcessorConfiguration
:
EventProcessingConfigurer configurer;
TrackingEventProcessorConfiguration trackingProcessorConfig =
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
.andInitialTrackingToken(StreamableMessageSource::createHeadToken);
configurer.registerTrackingEventProcessor("{class-name-of-saga}Processor",
Configuration::eventStore,
c -> trackingProcessorConfig);
因此,您将为您的 Saga 创建所需的配置并调用 andInitialTrackingToken()
函数并确保创建不存在标记的头部标记。
希望这对您有所帮助,汤姆!
Steven 的解决方案非常有效,但仅限于 Sagas。对于那些想要达到相同效果但在经典 @EventHandler
中(跳过重播执行)的人来说,有一种方法。首先你必须找出你的跟踪事件处理器是如何命名的——我在 AxonDashboard 中找到它(运行 AxonServer 上的 8024 端口)——通常它是带有 @EventHandler
注释的组件的位置(包名称是精确的)。然后按照史蒂文在他的回答中指出的那样添加配置。
@Autowired
public void customConfig(EventProcessingConfigurer configurer) {
// This prevents from replaying some events in @EventHandler
var trackingProcessorConfig = TrackingEventProcessorConfiguration
.forSingleThreadedProcessing()
.andInitialTrackingToken(StreamableMessageSource::createHeadToken);
configurer.registerTrackingEventProcessor("com.domain.notreplayable",
org.axonframework.config.Configuration::eventStore,
c -> trackingProcessorConfig);
}
我们有存储新订单的 Axon 应用程序。对于每个订单状态更改 (OrderStateChangedEvent),它都会计划几个任务。任务由另一个 Saga 触发和继续(TaskSaga - 超出问题范围)
当我删除投影数据库,但离开事件存储,然后再次运行应用程序时,事件被重播(正确的是),但任务是重复的。
我想这是因为 OrderStateChangedEvent
每次都会触发一组新的 ScheduleTaskCommand
。
因为我是 Axon 的新手,不知道如何避免这种重复。
AxonServer 上的事件存储 运行ning
Spring 启动应用程序自动配置轴突
投影数据库包含投影表和轴突表: token_entry saga_entry association_value_entry
我想所有事件都被重播了,因为通过重新创建数据库,Axon 表消失了(因此没有关于最后应用事件的记录)
我是不是漏掉了什么?
- token_entry/saga_entry/association_value_entry 表是否应该成为每个应用程序节点上投影表的数据库的一部分?
- 我认为事件存储可能会在不更改事件历史的情况下随时重播到新应用程序节点的数据库中,因此我可以 运行 任意数量的节点。或者我可以随时删除投影 dB 和 运行 应用程序,导致事件再次投影到新数据库的原因。或者这不是真的?
- 总的来说,我的问题是一个事件产生了导致产生新事件(重复)的命令。我应该避免这种 "chaining" 事件以避免重复吗?
谢谢!
轴突配置:
@Configuration
public class AxonConfig {
@Bean
public EventSourcingRepository<ApplicationAggregate> applicationEventSourcingRepository(EventStore eventStore) {
return EventSourcingRepository.builder(ApplicationAggregate.class)
.eventStore(eventStore)
.build();
}
@Bean
public SagaStore sagaStore(EntityManager entityManager) {
return JpaSagaStore.builder().entityManagerProvider(new SimpleEntityManagerProvider(entityManager)).build();
}
}
- 订单聚合收到的 CreateOrderCommand(方法 fromCommand 只是将 1:1 命令映射到事件)
@CommandHandler
public OrderAggregate(CreateOrderCommand cmd) {
apply(OrderCreatedEvent.fromCommand(cmd))
.andThenApply(() -> OrderStateChangedEvent.builder()
.applicationId(cmd.getOrderId())
.newState(OrderState.NEW)
.build());
}
- 订单聚合设置属性
@EventSourcingHandler
protected void on(OrderCreatedEvent event) {
id = event.getOrderId();
// ... additional properties set
}
@EventSourcingHandler
protected void on(OrderStateChangedEvent cmd) {
this.state = cmd.getNewState();
}
- OrderStateChangedEvent 被 Saga 侦听,为特定状态的顺序安排几个任务
private Map<String, TaskStatus> tasks = new HashMap<>();
private OrderState orderState;
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void on(OrderStateChangedEvent event) {
orderState = event.getNewState();
List<OrderStateAwareTaskDefinition> tasksByState = taskService.getTasksByState(orderState);
if (tasksByState.isEmpty()) {
finishSaga(event.getOrderId());
}
tasksByState.stream()
.map(task -> ScheduleTaskCommand.builder()
.orderId(event.getOrderId())
.taskId(IdentifierFactory.getInstance().generateIdentifier())
.targetState(orderState)
.taskName(task.getTaskName())
.build())
.peek(command -> tasks.put(command.getTaskId(), SCHEDULED))
.forEach(command -> commandGateway.send(command));
}
我想在这种情况下我可以帮助你。
因此,发生这种情况是因为 TrackingEventProcessor
使用的 TrackingToken
为您的 Saga 实例提供所有事件被初始化为事件流的开头。因此,TrackingEventProcessor
将从时间的开始开始,从而使您的所有命令都被第二次调度。
您可以采取一些措施来解决此问题。
- 您可以不擦除整个数据库,而只擦除投影 table 并保持令牌 table 完好无损。
- 您可以将
TrackingEventProcessor
的initialTrackingToken
配置为从事件流的头部而不是尾部开始。
选项 1 可以找到,但需要从操作角度进行一些授权。选项 2 将其交由开发人员处理,可能比其他解决方案更安全。
要调整令牌从头开始,您可以实例化一个 TrackingEventProcessor
和一个 TrackingEventProcessorConfiguration
:
EventProcessingConfigurer configurer;
TrackingEventProcessorConfiguration trackingProcessorConfig =
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
.andInitialTrackingToken(StreamableMessageSource::createHeadToken);
configurer.registerTrackingEventProcessor("{class-name-of-saga}Processor",
Configuration::eventStore,
c -> trackingProcessorConfig);
因此,您将为您的 Saga 创建所需的配置并调用 andInitialTrackingToken()
函数并确保创建不存在标记的头部标记。
希望这对您有所帮助,汤姆!
Steven 的解决方案非常有效,但仅限于 Sagas。对于那些想要达到相同效果但在经典 @EventHandler
中(跳过重播执行)的人来说,有一种方法。首先你必须找出你的跟踪事件处理器是如何命名的——我在 AxonDashboard 中找到它(运行 AxonServer 上的 8024 端口)——通常它是带有 @EventHandler
注释的组件的位置(包名称是精确的)。然后按照史蒂文在他的回答中指出的那样添加配置。
@Autowired
public void customConfig(EventProcessingConfigurer configurer) {
// This prevents from replaying some events in @EventHandler
var trackingProcessorConfig = TrackingEventProcessorConfiguration
.forSingleThreadedProcessing()
.andInitialTrackingToken(StreamableMessageSource::createHeadToken);
configurer.registerTrackingEventProcessor("com.domain.notreplayable",
org.axonframework.config.Configuration::eventStore,
c -> trackingProcessorConfig);
}