Axon - 发生错误后卡在 1 个事件中

Axon - stuck in 1 event after error occurred

我最近学习了 Axon 和 CQRS & Event Sourcing。

CMIIW 事件存储就像一个应用程序状态,所以在 Axon 中我们有 org.axonframework.eventhandling.GlobalSequenceTrackingToken 这意味着我们可以从某个时间知道我们应用程序的当前状态这个记录对吗?

processor_name   |   segment owner| timestamp                      | token             | token_type
myapps.projection|       0    far |  2021-02-03T02:22:46.6003318Z  | {"globalIndex":32}|  org.axonframework.eventhandling.GlobalSequenceTrackingToken

所以一切正常,直到我忘记做一些验证然后我的应用程序卡在 1 个事件 ({"globalIndex":32}) 尝试发布此事件直到成功。

发生这种情况是因为我的 @Eventhandler 抛出了一个错误。

解释: 我有 2 个事件,具有相同的 userName@EventHandler 无法解决此问题并引发错误,因为 userName must be unique.

token:31,payload: {"id":"d6554811-ee5a-46d5-973d-1d61ee71c7fe","fullName":"xasxsax","userName":"xasxsaxsa","email":"xasxsa@g.com","password":"adminadmin"}

token:32,payload:{"id":"b41efb71-14ee-4a5f-bdef-93b36e2e6a84","fullName":"xasxsax","userName":"xasxsaxsa","email":"xasxsa@g.com","password":"adminadmin"}

我的问题是,是否有任何处​​理错误的最佳做法,以便其他事件可以正常工作,而无需等待修复此单个事件?或者有解决这个问题的方法吗?

假设您使用的是 Axon 4 和默认的 TrackingEventProcessor。 这些事件形成一个流,所以你必须在处理下一个事件之前处理这个事件;从 @EventHandler 抛出异常告诉 Axon 您当前无法处理该事件。 在这种特定情况下,您可能应该简单地记录错误,而不是抛出错误,因为第二个事件根本没有意义 - 它要求创建已经存在的东西。

对于其他类型的失败事件,再次只记录错误并允许流继续,然后在纠正失败原因后重播事件可能会很有趣。要重播单个聚合的特定事件,您可以这样做:

@Autowired
EventStore eventStore;
    
@Test
public void testReplay() {
    
    String aggregateIdentifier = "1234";
    long startFrom = 0;
    String eventType = "org.sample.model.events.MyEvent";
    Class projectorClass = MyProjector.class;
    
    AnnotationEventHandlerAdapter eventHandlerAdapter = new AnnotationEventHandlerAdapter(projectorClass);
    DomainEventStream eventStream = eventStore.readEvents(aggregateIdentifier, startFrom);
    eventStream.asStream()
        .filter(event -> {
            // add any type of filtering based on the event here
            return event.getType() == eventType;
        })
        .forEach(event1 -> {
            try {
                eventHandlerAdapter.handle(event1);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
}

您可以在操作界面中公开此方法,以允许您的操作团队在分析后重播特定事件。

简而言之,失败事件的死信队列将是一个不错的解决方案。就目前而言,Axon Framework 不提供死信队列。它为您提供的是您可以配置的两个级别的错误处理。

首先是 ListenerInvocationErrorHandler。对于从 @EventHandler 注释函数中抛出的任何异常,都会调用此错误处理程序。 ListenerInvocationErrorHandler 默认配置为 LogginErrorHandler,您可能已经猜到了,它只是记录异常 并继续

第二层就是所谓的ErrorHandler。为事件处理器内部发生的事务性异常调用此接口。通常应该完全停止处理器工作的异常。此错误处理程序默认为 PropagatingErrorHandler,它只是重新抛出您的异常。在内部(默认)这样做 TrackingEventProcessor 意味着事件处理器进入重试模式。

根据您的描述,我假设您遇到了事务异常,或者您已将 ListenerInvocationErrorHandler 配置为 PropagatingErrorHandler。如果您不想在发生异常时停止处理器,则可以将其配置为简单地记录并继续 LoggingErrorHandler.

这样做是否明智,完全是另一回事。正如 Bart 在另一个 post 中提到的那样,无论出于何种原因,您的验证过程决定该事件发生,这些事件都在您的商店中。正如活动商店规定的那样,它会一直存在。这意味着您将不得不应对它。解决此类故障事件的解决方案是在您的应用程序中处理补偿操作。 here.

可以找到一种不太技术性的方式来描述这种必要性

无论如何,如果事件出错,您的设置将不得不处理它。例如添加一个 try-catch 块来处理已知的可能异常。或者使事件处理对于您预期的某些问题更具弹性。或者,继续努力,构建一个死信队列。