Axon 框架重试逻辑

Axon framework retry logic

我正在尝试使用 Axon 4.1+ 中的 eventProcessingModule 在 2 JVM 节点 K8 集群上执行重播事件。虽然我设置了它会清理事件,但它只从一个节点中拾取它,而另一个节点保持 运行 因为它的跟踪事件仍然存在。

我如何才能同时在所有 JVM 中禁用它,以便它可以正确重播?然后在所有这些上启用以继续处理命令。

我已经尝试通过此代码增加线程,这导致了另一个问题,即现有令牌永远不会在 InitialSemgmentsCount 中增加,除非我从数据库中完全删除令牌。

    public void config(final EventProcessingConfigurer configurer) {
        configurer.registerTrackingEventProcessorConfiguration(c ->
                TrackingEventProcessorConfiguration
                        .forParallelProcessing(2)
                        .andInitialSegmentsCount(2));
        // .andTokenClaimInterval(10L, TimeUnit.SECONDS)); // Event
    }

我当前的设置:

# Application.yml
axon:
  distributed:
    enabled: true

具有以下内容的服务组件:

EventProcessingConfiguration eventProcessingModule

中自动装配
eventProcessingModule
        .eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
        .ifPresent(trackingEventProcessor -> {
            trackingEventProcessor.shutDown();
            trackingEventProcessor.resetTokens();
        });

// Thread.sleep() to verify with

eventProcessingModule
        .eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
        .ifPresent(trackingEventProcessor -> {
            trackingEventProcessor.start();
        });

示例代码在单个 shutDown/reset/start 设置中包含上述内容,但我将它们分开只是为了看看它是如何工作的(相信 @ResetHandler 在 Reset 之后被调用,因此 Start 等待,但不完全确定)

SkuProjection 组件使用 @ResetHandler void 方法清理要重播的 table。

根据当前拥有令牌的 JVM,另一个将 return 此错误: SkuReplayService.startReplay: Failed on exception! Unable to claim token 'query.skuProcessor.SkuProjection[0]'. It is owned by '1@sku-7694bbc6b6-8p958'

我认为 token_entry table 会转到 "null" 的所有者,而它的 "stopped" 但由于有 2 个 JVM,目前没有令牌将接管它,而另一个重播。确认第二个节点是运行第一个节点停止其处理器后的Token。

我想我可以在这里给你一些指导。

您正在研究如何跨多个 JVM 将操作委托给一组特定的跟踪事件处理器,对吗? 现在,API Axon Framework 为您提供 TrackingEventProcessor,是有意维护在 TrackingEventProcessor.

上的

因此,执行 start()shutDown()processingStatus()resetTokens() 是对特定跟踪事件处理器实例执行的调用。

Note: the resetTokens() method is what effectively will issue a replay for the Event Handlers which the Tracking Event Processor is in charge off.

此外,您会看到 Unable to claim token 异常,因为框架要求您执行重置的 TrackingEventProcessor 是给定处理组的所有令牌的所有者。这样做的原因是它需要将令牌调整为 ReplayTokens 以支持像 @ResetHandler.

这样的好功能。

如果我是对的,你现在要求的是:

Does Axon Framework provide a means to delegate a specific Tracking Event Processor operation to all instances which are dealing with the same processing group?

我想我在这里会让你失望@sherring,Axon Framework 不提供委托此类操作的方法。 实现这些委托 start/stop/reset 操作的最快方法是设置 Axon Server,因为标准版(免费)已经为您提供了此功能。

如果出于某种原因无法使用此免费软件,则意味着您必须自己为 start/stop/reset/{insert-any-TrackingEventProcessor-operation} 创建这样一个授权系统。或者,您可以从运营的角度更多地处理这个问题。因此,关闭给定 Axon Framework 应用程序的第二个实例,确保只有一个要重置的 TrackingEventProcessor 实例。

希望这对您有所帮助@sherring!