Axon 框架重试逻辑
Axon framework retry logic
我正在尝试使用 Axon 4.1+ 中的 eventProcessingModule
在 2 JVM 节点 K8 集群上执行重播事件。虽然我设置了它会清理事件,但它只从一个节点中拾取它,而另一个节点保持 运行 因为它的跟踪事件仍然存在。
我如何才能同时在所有 JVM 中禁用它,以便它可以正确重播?然后在所有这些上启用以继续处理命令。
我已经尝试通过此代码增加线程,这导致了另一个问题,即现有令牌永远不会在 InitialSemgmentsCount 中增加,除非我从数据库中完全删除令牌。
public void config(final EventProcessingConfigurer configurer) {
configurer.registerTrackingEventProcessorConfiguration(c ->
TrackingEventProcessorConfiguration
.forParallelProcessing(2)
.andInitialSegmentsCount(2));
// .andTokenClaimInterval(10L, TimeUnit.SECONDS)); // Event
}
我当前的设置:
# Application.yml
axon:
distributed:
enabled: true
具有以下内容的服务组件:
在 EventProcessingConfiguration eventProcessingModule
中自动装配
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.shutDown();
trackingEventProcessor.resetTokens();
});
// Thread.sleep() to verify with
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.start();
});
示例代码在单个 shutDown/reset/start 设置中包含上述内容,但我将它们分开只是为了看看它是如何工作的(相信 @ResetHandler 在 Reset 之后被调用,因此 Start 等待,但不完全确定)
SkuProjection 组件使用 @ResetHandler
void 方法清理要重播的 table。
根据当前拥有令牌的 JVM,另一个将 return 此错误:
SkuReplayService.startReplay: Failed on exception! Unable to claim token 'query.skuProcessor.SkuProjection[0]'. It is owned by '1@sku-7694bbc6b6-8p958'
我认为 token_entry
table 会转到 "null" 的所有者,而它的 "stopped" 但由于有 2 个 JVM,目前没有令牌将接管它,而另一个重播。确认第二个节点是运行第一个节点停止其处理器后的Token。
我想我可以在这里给你一些指导。
您正在研究如何跨多个 JVM 将操作委托给一组特定的跟踪事件处理器,对吗?
现在,API Axon Framework 为您提供 TrackingEventProcessor
,是有意维护在 TrackingEventProcessor
.
上的
因此,执行 start()
、shutDown()
、processingStatus()
或 resetTokens()
是对特定跟踪事件处理器实例执行的调用。
Note: the resetTokens()
method is what effectively will issue a replay for the Event Handlers which the Tracking Event Processor is in charge off.
此外,您会看到 Unable to claim token
异常,因为框架要求您执行重置的 TrackingEventProcessor
是给定处理组的所有令牌的所有者。这样做的原因是它需要将令牌调整为 ReplayTokens
以支持像 @ResetHandler
.
这样的好功能。
如果我是对的,你现在要求的是:
Does Axon Framework provide a means to delegate a specific Tracking Event Processor operation to all instances which are dealing with the same processing group?
我想我在这里会让你失望@sherring,Axon Framework 不提供委托此类操作的方法。
实现这些委托 start/stop/reset 操作的最快方法是设置 Axon Server,因为标准版(免费)已经为您提供了此功能。
如果出于某种原因无法使用此免费软件,则意味着您必须自己为 start/stop/reset/{insert-any-TrackingEventProcessor-operation} 创建这样一个授权系统。或者,您可以从运营的角度更多地处理这个问题。因此,关闭给定 Axon Framework 应用程序的第二个实例,确保只有一个要重置的 TrackingEventProcessor
实例。
希望这对您有所帮助@sherring!
我正在尝试使用 Axon 4.1+ 中的 eventProcessingModule
在 2 JVM 节点 K8 集群上执行重播事件。虽然我设置了它会清理事件,但它只从一个节点中拾取它,而另一个节点保持 运行 因为它的跟踪事件仍然存在。
我如何才能同时在所有 JVM 中禁用它,以便它可以正确重播?然后在所有这些上启用以继续处理命令。
我已经尝试通过此代码增加线程,这导致了另一个问题,即现有令牌永远不会在 InitialSemgmentsCount 中增加,除非我从数据库中完全删除令牌。
public void config(final EventProcessingConfigurer configurer) {
configurer.registerTrackingEventProcessorConfiguration(c ->
TrackingEventProcessorConfiguration
.forParallelProcessing(2)
.andInitialSegmentsCount(2));
// .andTokenClaimInterval(10L, TimeUnit.SECONDS)); // Event
}
我当前的设置:
# Application.yml
axon:
distributed:
enabled: true
具有以下内容的服务组件:
在 EventProcessingConfiguration eventProcessingModule
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.shutDown();
trackingEventProcessor.resetTokens();
});
// Thread.sleep() to verify with
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.start();
});
示例代码在单个 shutDown/reset/start 设置中包含上述内容,但我将它们分开只是为了看看它是如何工作的(相信 @ResetHandler 在 Reset 之后被调用,因此 Start 等待,但不完全确定)
SkuProjection 组件使用 @ResetHandler
void 方法清理要重播的 table。
根据当前拥有令牌的 JVM,另一个将 return 此错误:
SkuReplayService.startReplay: Failed on exception! Unable to claim token 'query.skuProcessor.SkuProjection[0]'. It is owned by '1@sku-7694bbc6b6-8p958'
我认为 token_entry
table 会转到 "null" 的所有者,而它的 "stopped" 但由于有 2 个 JVM,目前没有令牌将接管它,而另一个重播。确认第二个节点是运行第一个节点停止其处理器后的Token。
我想我可以在这里给你一些指导。
您正在研究如何跨多个 JVM 将操作委托给一组特定的跟踪事件处理器,对吗?
现在,API Axon Framework 为您提供 TrackingEventProcessor
,是有意维护在 TrackingEventProcessor
.
因此,执行 start()
、shutDown()
、processingStatus()
或 resetTokens()
是对特定跟踪事件处理器实例执行的调用。
Note: the
resetTokens()
method is what effectively will issue a replay for the Event Handlers which the Tracking Event Processor is in charge off.
此外,您会看到 Unable to claim token
异常,因为框架要求您执行重置的 TrackingEventProcessor
是给定处理组的所有令牌的所有者。这样做的原因是它需要将令牌调整为 ReplayTokens
以支持像 @ResetHandler
.
如果我是对的,你现在要求的是:
Does Axon Framework provide a means to delegate a specific Tracking Event Processor operation to all instances which are dealing with the same processing group?
我想我在这里会让你失望@sherring,Axon Framework 不提供委托此类操作的方法。 实现这些委托 start/stop/reset 操作的最快方法是设置 Axon Server,因为标准版(免费)已经为您提供了此功能。
如果出于某种原因无法使用此免费软件,则意味着您必须自己为 start/stop/reset/{insert-any-TrackingEventProcessor-operation} 创建这样一个授权系统。或者,您可以从运营的角度更多地处理这个问题。因此,关闭给定 Axon Framework 应用程序的第二个实例,确保只有一个要重置的 TrackingEventProcessor
实例。
希望这对您有所帮助@sherring!