使用 Axon 服务器将事件分布到不同的 JVM 以订阅事件处理器(没有事件源)
Distributing events across different JVMs with Axon Server to Subscribing Event Processors (without Event Sourcing)
我在一个模块(JVM、容器)中使用带有聚合的 Axon Framework (4.1),在另一个模块中使用 projections/Sagas。我想要做的是让分布式应用程序利用 CQRS 但没有事件源.
设置非常简单,并且在单个应用程序中一切都按预期工作。当涉及多个独立模块(跨不同的 JVM)时,就会出现问题。开箱即用的 Axon 启动器使用连接到 AxonServerEventStore
的跟踪处理器,在跨不同 JVM 监听事件时允许 "location transparency"。
就我而言,我不需要任何基础设施来保存或跟踪事件。我只想以 即发即弃的方式 将事件从我的聚合分发到任何订阅处理器 (SEP),就像 AxonServerQueryBus
分发分散-聚集所做的一样例如查询。
如果我声明所有处理器都订阅如下:
@Autowired
public void configureEventSubscribers(EventProcessingConfigurer configurer) {
configurer.usingSubscribingEventProcessors();
}
事件到达同一 JVM 中的所有 @EventHandler
方法,但事件不再到达其他 JVM 中的任何处理程序。如果我的理解是正确的,那么,Axon 服务器将跨 JVM 分发事件,仅供 跟踪处理器 (TEP)。
显然,我能做的是结合使用外部消息代理(RabbitMQ、Kafka)和 SpringAMQPMessageSource
(如文档中所述),通过 RabbitMQ 中的扇出等方式将事件分发给所有订阅者.这行得通,但这需要我自己维护代理。
如果让 Axon Server 像处理分发命令和查询一样处理这件事就好了(这会让我少关心一个基础设施)。
作为旁注,我实际上已经设法使用 QueryBus
将事件分发到投影,并将事件作为有效负载传递给 GenericQueryMessage
作为分散-收集查询发送。不用说,这不是一个可靠的解决方案。但它证明了 Axon Server 将事件(毕竟只是另一种类型的消息)无差别地分发到 SEP 或 TEP 本质上没有什么是不可能的。
最后,问题:
1) 在位置透明度和分发事件方面,社区对使用 Axon 的纯 CQRS(无事件源)的建议是什么?
2) 是否可以使 Axon 服务器跨 JVM 将事件分发到 SEP(消除对外部消息代理的需要)?
事件溯源注意事项
从 Axon Framework 的角度来看,事件溯源是您的命令模型的唯一关注点。采取这种立场,因为事件溯源通过 it 发布的事件定义模型的重建。然而,查询模型不会对发布事件更改其状态的命令做出反应,它只是侦听(分布式)事件以更新其状态以供其他人查询。
因此,框架仅在通过提供 EventSourcingRepository
.
重新创建聚合时才考虑事件源。
事件处理器的工作是"mechanical aspect of providing events to your Event Handlers"。这与 CQRS 中的 Q 部分有关,与重新创建查询模型有关。
因此,框架不将事件处理器视为事件溯源概念的一部分。
你的场景的答案
我想强调的是,如果您通过给定应用程序的 运行 多个 实例分发您的应用程序,您很可能需要有一种方法来确保给定事件仅处理 一次.
这是跟踪事件处理器 (TEP) 解决的问题之一,它通过使用跟踪令牌来实现。
Tracking Token 本质上充当定义哪些事件已被处理的标记。添加,给定的 TEP 线程倾向于对能够工作的令牌提出要求,从而确保给定的事件不会被处理两次。
最后,您将需要定义基础架构来存储跟踪令牌以便能够分发事件负载,基本上完全选择不使用 SubscribingEventProcessor
。
但是,以上是否是一个问题确实取决于您的应用程序环境。
也许您根本没有复制给定的应用程序,因此实际上 而不是 复制给定的跟踪事件处理器。
在这种情况下,您可以完成对 "not track events" 的请求,同时仍然使用跟踪事件处理器。
您所要做的就是确保您没有存储它们。用于存储令牌的接口是 TokenStore
,内存中存在 版本。
然而,在默认的 Axon 设置中使用 InMemoryTokenStore
将意味着从技术上讲,您每次都会重播您的事件。这是由于默认的 "initial Tracking Token" 进程造成的。当然,这也是可配置的,为此我建议您使用以下方法:
// Creating the configuration for a TEP
TrackingEventProcessorConfiguration tepConfig =
TrackingEventProcessorConfiguration
.forSingleThreadedProcessing() // Note: could also be multi-threaded
.andInitialTrackingToken(StreamableMessageSource::createHeadToken);
// Registering as default TEP config
EventProcessingConfigurer.
registerTrackingEventProcessorConfiguration(config -> tepConfig);
这应该可以让您使用 TEP,而无需设置基础设施来存储令牌。但请注意,这需要您而非复制给定的应用程序。
我想以您发布的以下问题结束:
Is it possible to make Axon Server to distribute events to SEPs across JVMs (eliminating the need for an external message broker)?
正如您正确指出的那样,SEP(当前)仅可用于订阅已在给定 JVM 中发布的事件。 Axon 服务器(目前)还没有一种机制可以将事件从一个 JVM 桥接到另一个 JVM 以允许分布式订阅事件处理。我是(作为 AxonIQ 的一部分)但是相对确定我们将来会研究这个问题。如果这样的功能对您的项目的成功完成很重要,我建议直接 contact AxonIQ。
如果您考虑将 Apache Kafka 用于此用例,您可能需要查看 kalium.alkal.io。
它会让你的代码更简单
MyObject myObject = ....
kalium.post(myObject); //this is used to send POJOs/protobuffs using Kafka Producer
//On the consumer side. This will use a deserializer with the Kafka Consumer API
kalium.on(MyObject.class, myObject -> {
// do something with the object
}, "consumer_group");
我在一个模块(JVM、容器)中使用带有聚合的 Axon Framework (4.1),在另一个模块中使用 projections/Sagas。我想要做的是让分布式应用程序利用 CQRS 但没有事件源.
设置非常简单,并且在单个应用程序中一切都按预期工作。当涉及多个独立模块(跨不同的 JVM)时,就会出现问题。开箱即用的 Axon 启动器使用连接到 AxonServerEventStore
的跟踪处理器,在跨不同 JVM 监听事件时允许 "location transparency"。
就我而言,我不需要任何基础设施来保存或跟踪事件。我只想以 即发即弃的方式 将事件从我的聚合分发到任何订阅处理器 (SEP),就像 AxonServerQueryBus
分发分散-聚集所做的一样例如查询。
如果我声明所有处理器都订阅如下:
@Autowired
public void configureEventSubscribers(EventProcessingConfigurer configurer) {
configurer.usingSubscribingEventProcessors();
}
事件到达同一 JVM 中的所有 @EventHandler
方法,但事件不再到达其他 JVM 中的任何处理程序。如果我的理解是正确的,那么,Axon 服务器将跨 JVM 分发事件,仅供 跟踪处理器 (TEP)。
显然,我能做的是结合使用外部消息代理(RabbitMQ、Kafka)和 SpringAMQPMessageSource
(如文档中所述),通过 RabbitMQ 中的扇出等方式将事件分发给所有订阅者.这行得通,但这需要我自己维护代理。
如果让 Axon Server 像处理分发命令和查询一样处理这件事就好了(这会让我少关心一个基础设施)。
作为旁注,我实际上已经设法使用 QueryBus
将事件分发到投影,并将事件作为有效负载传递给 GenericQueryMessage
作为分散-收集查询发送。不用说,这不是一个可靠的解决方案。但它证明了 Axon Server 将事件(毕竟只是另一种类型的消息)无差别地分发到 SEP 或 TEP 本质上没有什么是不可能的。
最后,问题:
1) 在位置透明度和分发事件方面,社区对使用 Axon 的纯 CQRS(无事件源)的建议是什么?
2) 是否可以使 Axon 服务器跨 JVM 将事件分发到 SEP(消除对外部消息代理的需要)?
事件溯源注意事项
从 Axon Framework 的角度来看,事件溯源是您的命令模型的唯一关注点。采取这种立场,因为事件溯源通过 it 发布的事件定义模型的重建。然而,查询模型不会对发布事件更改其状态的命令做出反应,它只是侦听(分布式)事件以更新其状态以供其他人查询。
因此,框架仅在通过提供 EventSourcingRepository
.
事件处理器的工作是"mechanical aspect of providing events to your Event Handlers"。这与 CQRS 中的 Q 部分有关,与重新创建查询模型有关。 因此,框架不将事件处理器视为事件溯源概念的一部分。
你的场景的答案
我想强调的是,如果您通过给定应用程序的 运行 多个 实例分发您的应用程序,您很可能需要有一种方法来确保给定事件仅处理 一次.
这是跟踪事件处理器 (TEP) 解决的问题之一,它通过使用跟踪令牌来实现。 Tracking Token 本质上充当定义哪些事件已被处理的标记。添加,给定的 TEP 线程倾向于对能够工作的令牌提出要求,从而确保给定的事件不会被处理两次。
最后,您将需要定义基础架构来存储跟踪令牌以便能够分发事件负载,基本上完全选择不使用 SubscribingEventProcessor
。
但是,以上是否是一个问题确实取决于您的应用程序环境。
也许您根本没有复制给定的应用程序,因此实际上 而不是 复制给定的跟踪事件处理器。
在这种情况下,您可以完成对 "not track events" 的请求,同时仍然使用跟踪事件处理器。
您所要做的就是确保您没有存储它们。用于存储令牌的接口是 TokenStore
,内存中存在 版本。
然而,在默认的 Axon 设置中使用 InMemoryTokenStore
将意味着从技术上讲,您每次都会重播您的事件。这是由于默认的 "initial Tracking Token" 进程造成的。当然,这也是可配置的,为此我建议您使用以下方法:
// Creating the configuration for a TEP
TrackingEventProcessorConfiguration tepConfig =
TrackingEventProcessorConfiguration
.forSingleThreadedProcessing() // Note: could also be multi-threaded
.andInitialTrackingToken(StreamableMessageSource::createHeadToken);
// Registering as default TEP config
EventProcessingConfigurer.
registerTrackingEventProcessorConfiguration(config -> tepConfig);
这应该可以让您使用 TEP,而无需设置基础设施来存储令牌。但请注意,这需要您而非复制给定的应用程序。
我想以您发布的以下问题结束:
Is it possible to make Axon Server to distribute events to SEPs across JVMs (eliminating the need for an external message broker)?
正如您正确指出的那样,SEP(当前)仅可用于订阅已在给定 JVM 中发布的事件。 Axon 服务器(目前)还没有一种机制可以将事件从一个 JVM 桥接到另一个 JVM 以允许分布式订阅事件处理。我是(作为 AxonIQ 的一部分)但是相对确定我们将来会研究这个问题。如果这样的功能对您的项目的成功完成很重要,我建议直接 contact AxonIQ。
如果您考虑将 Apache Kafka 用于此用例,您可能需要查看 kalium.alkal.io。 它会让你的代码更简单
MyObject myObject = ....
kalium.post(myObject); //this is used to send POJOs/protobuffs using Kafka Producer
//On the consumer side. This will use a deserializer with the Kafka Consumer API
kalium.on(MyObject.class, myObject -> {
// do something with the object
}, "consumer_group");