Axon Sagas 在将事件重播到新数据库时复制事件存储中的事件

Axon Sagas duplicates events in event store when replaying events to new DB

我们有存储新订单的 Axon 应用程序。对于每个订单状态更改 (OrderStateChangedEvent),它都会计划几个任务。任务由另一个 Saga 触发和继续(TaskSaga - 超出问题范围)

当我删除投影数据库,但离开事件存储,然后再次运行应用程序时,事件被重播(正确的是),但任务是重复的。

我想这是因为 OrderStateChangedEvent 每次都会触发一组新的 ScheduleTaskCommand

因为我是 Axon 的新手,不知道如何避免这种重复。

AxonServer 上的事件存储 运行ning

Spring 启动应用程序自动配置轴突

投影数据库包含投影表和轴突表: token_entry saga_entry association_value_entry

我想所有事件都被重播了,因为通过重新创建数据库,Axon 表消失了(因此没有关于最后应用事件的记录)

我是不是漏掉了什么?

谢谢!

轴突配置:

@Configuration
public class AxonConfig {

    @Bean
    public EventSourcingRepository<ApplicationAggregate> applicationEventSourcingRepository(EventStore eventStore) {
        return EventSourcingRepository.builder(ApplicationAggregate.class)
                        .eventStore(eventStore)
                        .build();
    }

    @Bean
    public SagaStore sagaStore(EntityManager entityManager) {
        return JpaSagaStore.builder().entityManagerProvider(new SimpleEntityManagerProvider(entityManager)).build();
    }
}
  1. 订单聚合收到的 CreateOrderCommand(方法 fromCommand 只是将 1:1 命令映射到事件)
    @CommandHandler
    public OrderAggregate(CreateOrderCommand cmd) {
        apply(OrderCreatedEvent.fromCommand(cmd))
                .andThenApply(() -> OrderStateChangedEvent.builder()
                        .applicationId(cmd.getOrderId())
                        .newState(OrderState.NEW)
                        .build());
    }
  1. 订单聚合设置属性
    @EventSourcingHandler
    protected void on(OrderCreatedEvent event) {
        id = event.getOrderId();

        // ... additional properties set

    }

    @EventSourcingHandler
    protected void on(OrderStateChangedEvent cmd) {
        this.state = cmd.getNewState();
    }
  1. OrderStateChangedEvent 被 Saga 侦听,为特定状态的顺序安排几个任务
    private Map<String, TaskStatus> tasks = new HashMap<>();

    private OrderState orderState;

    @StartSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void on(OrderStateChangedEvent event) {
        orderState = event.getNewState();
        List<OrderStateAwareTaskDefinition> tasksByState = taskService.getTasksByState(orderState);
        if (tasksByState.isEmpty()) {
            finishSaga(event.getOrderId());
        }

        tasksByState.stream()
                .map(task -> ScheduleTaskCommand.builder()
                        .orderId(event.getOrderId())
                        .taskId(IdentifierFactory.getInstance().generateIdentifier())
                        .targetState(orderState)
                        .taskName(task.getTaskName())
                        .build())
                .peek(command -> tasks.put(command.getTaskId(), SCHEDULED))
                .forEach(command -> commandGateway.send(command));
    }

我想在这种情况下我可以帮助你。 因此,发生这种情况是因为 TrackingEventProcessor 使用的 TrackingToken 为您的 Saga 实例提供所有事件被初始化为事件流的开头。因此,TrackingEventProcessor 将从时间的开始开始,从而使您的所有命令都被第二次调度。

您可以采取一些措施来解决此问题。

  1. 您可以不擦除整个数据库,而只擦除投影 table 并保持令牌 table 完好无损。
  2. 您可以将 TrackingEventProcessorinitialTrackingToken 配置为从事件流的头部而不是尾部开始。

选项 1 可以找到,但需要从操作角度进行一些授权。选项 2 将其交由开发人员处理,可能比其他解决方案更安全。

要调整令牌从头开始,您可以实例化一个 TrackingEventProcessor 和一个 TrackingEventProcessorConfiguration:

    EventProcessingConfigurer configurer;

    TrackingEventProcessorConfiguration trackingProcessorConfig =
            TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
                                               .andInitialTrackingToken(StreamableMessageSource::createHeadToken);

    configurer.registerTrackingEventProcessor("{class-name-of-saga}Processor", 
                                              Configuration::eventStore, 
                                              c -> trackingProcessorConfig);

因此,您将为您的 Saga 创建所需的配置并调用 andInitialTrackingToken() 函数并确保创建不存在标记的头部标记。

希望这对您有所帮助,汤姆!

Steven 的解决方案非常有效,但仅限于 Sagas。对于那些想要达到相同效果但在经典 @EventHandler 中(跳过重播执行)的人来说,有一种方法。首先你必须找出你的跟踪事件处理器是如何命名的——我在 AxonDashboard 中找到它(运行 AxonServer 上的 8024 端口)——通常它是带有 @EventHandler 注释的组件的位置(包名称是精确的)。然后按照史蒂文在他的回答中指出的那样添加配置。

    @Autowired
    public void customConfig(EventProcessingConfigurer configurer) {
        // This prevents from replaying some events in @EventHandler
        var trackingProcessorConfig = TrackingEventProcessorConfiguration
                .forSingleThreadedProcessing()
                .andInitialTrackingToken(StreamableMessageSource::createHeadToken);
        configurer.registerTrackingEventProcessor("com.domain.notreplayable",
                org.axonframework.config.Configuration::eventStore,
                c -> trackingProcessorConfig);
    }