Axon 框架的排序策略如何在状态方面发挥作用

How Axon framework's sequencing policy works in terms of statefulness

在 Axon 的 reference guide 中写着

Besides these provided policies, you can define your own. All policies must implement the SequencingPolicy interface. This interface defines a single method, getSequenceIdentifierFor, that returns the sequence identifier for a given event. Events for which an equal sequence identifier is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently.

更重要的是,在 this thread 的最后一条消息中说

with the sequencing policy, you indicate which events need to be processed sequentially. It doesn't matter whether the threads are in the same JVM, or in different ones. If the sequencing policy returns the same value for 2 messages, they will be guaranteed to be processed sequentially, even if you have tracking processor threads across multiple JVMs.

那么这是否意味着事件处理器实际上是无状态的?如果是,那么他们如何设法同步? token store是否用于此目的?

我认为这取决于您将什么视为状态,但我假设从您的角度来看,是的,Axon 中的 EventProcessor 实现确实是无状态的。

SubscribingEventProcessor 接收 它是来自 SubscribableMessageSource 的事件(EventBus 实现此接口)。 TrackingEventProcessor 检索 它是来自 StreamableMessageSource 的事件(EventStore 实现了这个接口)在它自己的空闲时间。

后一个版本需要跟踪事件流中事件的位置。此信息存储在 TrackingToken 中,由 TokenStore 保存。 一个给定的 TrackingEventProcessor 线程 只能 处理事件,前提是它已经对它所属的处理组的 TrackingToken 提出了要求。因此,这可确保同一事件不会由两个不同的线程处理而意外更新同一查询模型。 TrackingToken 还允许对这个过程进行多线程处理,这是通过对令牌进行分段来完成的。段数(可通过 initialSegmentCount 调整)驱动 件的数量 给定处理组的 TrackingToken 将被分区。从角度来看TokenStore,这意味着您将存储多个 TrackingToken 个实例,这些实例等于您设置的段数。

SequencingPolicy 它的工作是驱动流中的哪些事件属于哪个段。这样做,例如,您可以使用 SequentialPerAggregate SequencingPolicy 来确保具有给定聚合标识符的所有事件都由一个段处理。