Axon Framework:仅处理同一 JVM 实例发布的事件?
Axon Framework: Handle only events published by the same JVM instance?
嗨 Axon 框架社区,
我想听听您对如何正确解决以下问题的看法。
我的 Axon 测试设置
- 相同Spring启动应用程序的两个实例(使用没有 Axon 服务器的 axon-spring-boot-starter 4.4)
- 每个实例定期发布相同的事件
- 两个实例都连接到同一个 EventSource(单个 SQL 服务器实例使用 JpaEventStorageEngine)
- 每个实例都配置为使用 TrackingEventProcessors
- 每个实例都注册了相同的事件处理程序
我想达到的目标
我希望一个实例发布的事件只由同一个实例处理
If instance1 publishes eventX then only instance1 should handle eventX
到目前为止我尝试了什么
- 我可以使用 SubscribingEventProcessor 实现上述场景。不幸的是,这不是我的选择,因为我们希望可以选择重播事件以重建/添加新的查询模型。
- 我可以将每个实例的事件处理程序分配给不同的处理组。不幸的是,这没有用。也许是因为每个 TrackingEventProcessors 实例都处理相同的 EventStream ? - 虽然不太确定。
- 我可以实现一个 MessageHandlerInterceptor,它仅在事件源来自同一实例的情况下才会继续。这是我到目前为止实施的并且工作正常:
MessageHandlerInterceptor
class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {
override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
if(stackId == stackProperties.id){
interceptorChain?.proceed()
}
return null
}
}
@Configuration
class AxonConfiguration {
@Autowired
fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
val processingGroup = "processing-group-stack-${stackProperties.id}"
eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
}
}
有更好的解决方案吗?
我的印象是我当前的解决方案并不是最好的解决方案,因为理想情况下我希望只有属于特定实例的事件处理程序才由 TrackingEventProcessor 实例触发。
你会如何解决这个问题?
你在@thowimmer 遇到的场景很有趣。
我的第一个直觉是“改用 SubscribingEventProcessor
”。
但是,您指出这不是您的设置中的一个选项。
我认为对于处于相同场景中的其他人来说,了解为什么这不是一种选择是 非常 有价值的。所以,也许你可以详细说明一下(老实说,我也很好奇)。
现在针对您的问题案例,确保事件仅在同一 JVM 中处理。
将原点添加到事件绝对是您可以采取的 a 步骤,因为这允许一种合乎逻辑的过滤方式。 “此事件是否源自 my.origin()
?”如果没有,您只需忽略该事件并完成它,就这么简单。不过还有另一种方法可以实现这一点,稍后我会谈到。
不过,我认为过滤的地方正是您要寻找的地方。但首先,我想说明 为什么 您首先需要过滤。正如您所注意到的,TrackingEventProcessor
(TEP) 从所谓的 StreamableMessageSource
流式传输事件。 EventStore
是这样一个 StreamableMessageSource
的实现。当您将所有事件存储在同一个存储中时,它只会将所有内容流式传输到您的 TEP。由于您的事件是单个事件流的一部分,因此您需要在某个阶段过滤它们。使用 MessageHandlerInterceptor
会起作用,您甚至可以编写 HandlerEnhacnerDefinition
允许您向事件处理函数添加额外的行为。不管你怎么说,在当前的设置下,过滤需要在某个地方完成。 MessageHandlerInterceptor
可以说是最简单的地方。
但是,有一种不同的处理方式。为什么不将您的事件存储隔离为两个应用程序的两个不同实例?显然他们不需要相互读取,那么为什么要共享同一个事件存储呢?在不了解您的域的更多背景的情况下,我猜您实际上是在处理驻留在不同 bounded contexts 中的应用程序。 非常 简而言之,与两者共享所有内容的兴趣为零 applications/contexts,您只需非常有意识地彼此共享领域语言的特定部分。
请注意,支持 multiple contexts,在中间使用单个通信集线器,正是Axon Server 可以为您实现的。我并不是说你不能自己配置它,我过去也这样做过。但是,将这项工作留给其他人或其他人,使您无需配置基础架构,这将大大节省时间。
希望这可以帮助您了解我对此事的看法@thowimmer。
总结:
如果我们想要使用 TrackingEventProcessor
.
的功能,那么为两个实例使用相同的 EventStore 可能不是一个理想的设置
解决问题的选项:
- 每个应用程序实例的专用(非镜像)数据库实例。
- 使用 multiple contexts 使用 AxonServer。
如果我们决定使用 MessageHandlerInterceptor
解决应用程序级过滤的问题是最简单的解决方案。
感谢@Steven 交流想法。
编辑:
应用程序级别的解决方案使用 CorrelationDataProvider
和 MessageHandlerInterceptor
通过过滤掉不是源自同一进程的事件。
AxonConfiguration.kt
const val METADATA_KEY_PROCESS_ID = "pid"
const val PROCESSING_GROUP_PREFIX = "processing-group-pid"
@Configuration
class AxonConfiguration {
@Bean
fun processIdCorrelationDataProvider() = ProcessIdCorrelationDataProvider()
@Autowired
fun configureProcessIdEventHandlerInterceptor(eventProcessingConfigurer: EventProcessingConfigurer) {
val processingGroup = "$PROCESSING_GROUP_PREFIX-${ApplicationPid()}"
eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { ProcessIdEventHandlerInterceptor() }
}
}
class ProcessIdCorrelationDataProvider() : CorrelationDataProvider {
override fun correlationDataFor(message: Message<*>?): MutableMap<String, *> {
return mutableMapOf(METADATA_KEY_PROCESS_ID to ApplicationPid().toString())
}
}
class ProcessIdEventHandlerInterceptor : MessageHandlerInterceptor<EventMessage<*>> {
override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?) {
val currentPid = ApplicationPid().toString()
val originPid = unitOfWork?.message?.metaData?.get(METADATA_KEY_PROCESS_ID)
if(currentPid == originPid){
interceptorChain?.proceed()
}
}
}
在 GitHub
上查看完整的演示项目
嗨 Axon 框架社区,
我想听听您对如何正确解决以下问题的看法。
我的 Axon 测试设置
- 相同Spring启动应用程序的两个实例(使用没有 Axon 服务器的 axon-spring-boot-starter 4.4)
- 每个实例定期发布相同的事件
- 两个实例都连接到同一个 EventSource(单个 SQL 服务器实例使用 JpaEventStorageEngine)
- 每个实例都配置为使用 TrackingEventProcessors
- 每个实例都注册了相同的事件处理程序
我想达到的目标
我希望一个实例发布的事件只由同一个实例处理
If instance1 publishes eventX then only instance1 should handle eventX
到目前为止我尝试了什么
- 我可以使用 SubscribingEventProcessor 实现上述场景。不幸的是,这不是我的选择,因为我们希望可以选择重播事件以重建/添加新的查询模型。
- 我可以将每个实例的事件处理程序分配给不同的处理组。不幸的是,这没有用。也许是因为每个 TrackingEventProcessors 实例都处理相同的 EventStream ? - 虽然不太确定。
- 我可以实现一个 MessageHandlerInterceptor,它仅在事件源来自同一实例的情况下才会继续。这是我到目前为止实施的并且工作正常: MessageHandlerInterceptor
class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {
override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
if(stackId == stackProperties.id){
interceptorChain?.proceed()
}
return null
}
}
@Configuration
class AxonConfiguration {
@Autowired
fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
val processingGroup = "processing-group-stack-${stackProperties.id}"
eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
}
}
有更好的解决方案吗?
我的印象是我当前的解决方案并不是最好的解决方案,因为理想情况下我希望只有属于特定实例的事件处理程序才由 TrackingEventProcessor 实例触发。
你会如何解决这个问题?
你在@thowimmer 遇到的场景很有趣。
我的第一个直觉是“改用 SubscribingEventProcessor
”。
但是,您指出这不是您的设置中的一个选项。
我认为对于处于相同场景中的其他人来说,了解为什么这不是一种选择是 非常 有价值的。所以,也许你可以详细说明一下(老实说,我也很好奇)。
现在针对您的问题案例,确保事件仅在同一 JVM 中处理。
将原点添加到事件绝对是您可以采取的 a 步骤,因为这允许一种合乎逻辑的过滤方式。 “此事件是否源自 my.origin()
?”如果没有,您只需忽略该事件并完成它,就这么简单。不过还有另一种方法可以实现这一点,稍后我会谈到。
不过,我认为过滤的地方正是您要寻找的地方。但首先,我想说明 为什么 您首先需要过滤。正如您所注意到的,TrackingEventProcessor
(TEP) 从所谓的 StreamableMessageSource
流式传输事件。 EventStore
是这样一个 StreamableMessageSource
的实现。当您将所有事件存储在同一个存储中时,它只会将所有内容流式传输到您的 TEP。由于您的事件是单个事件流的一部分,因此您需要在某个阶段过滤它们。使用 MessageHandlerInterceptor
会起作用,您甚至可以编写 HandlerEnhacnerDefinition
允许您向事件处理函数添加额外的行为。不管你怎么说,在当前的设置下,过滤需要在某个地方完成。 MessageHandlerInterceptor
可以说是最简单的地方。
但是,有一种不同的处理方式。为什么不将您的事件存储隔离为两个应用程序的两个不同实例?显然他们不需要相互读取,那么为什么要共享同一个事件存储呢?在不了解您的域的更多背景的情况下,我猜您实际上是在处理驻留在不同 bounded contexts 中的应用程序。 非常 简而言之,与两者共享所有内容的兴趣为零 applications/contexts,您只需非常有意识地彼此共享领域语言的特定部分。
请注意,支持 multiple contexts,在中间使用单个通信集线器,正是Axon Server 可以为您实现的。我并不是说你不能自己配置它,我过去也这样做过。但是,将这项工作留给其他人或其他人,使您无需配置基础架构,这将大大节省时间。
希望这可以帮助您了解我对此事的看法@thowimmer。
总结:
如果我们想要使用 TrackingEventProcessor
.
解决问题的选项:
- 每个应用程序实例的专用(非镜像)数据库实例。
- 使用 multiple contexts 使用 AxonServer。
如果我们决定使用 MessageHandlerInterceptor
解决应用程序级过滤的问题是最简单的解决方案。
感谢@Steven 交流想法。
编辑:
应用程序级别的解决方案使用 CorrelationDataProvider
和 MessageHandlerInterceptor
通过过滤掉不是源自同一进程的事件。
AxonConfiguration.kt
const val METADATA_KEY_PROCESS_ID = "pid"
const val PROCESSING_GROUP_PREFIX = "processing-group-pid"
@Configuration
class AxonConfiguration {
@Bean
fun processIdCorrelationDataProvider() = ProcessIdCorrelationDataProvider()
@Autowired
fun configureProcessIdEventHandlerInterceptor(eventProcessingConfigurer: EventProcessingConfigurer) {
val processingGroup = "$PROCESSING_GROUP_PREFIX-${ApplicationPid()}"
eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { ProcessIdEventHandlerInterceptor() }
}
}
class ProcessIdCorrelationDataProvider() : CorrelationDataProvider {
override fun correlationDataFor(message: Message<*>?): MutableMap<String, *> {
return mutableMapOf(METADATA_KEY_PROCESS_ID to ApplicationPid().toString())
}
}
class ProcessIdEventHandlerInterceptor : MessageHandlerInterceptor<EventMessage<*>> {
override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?) {
val currentPid = ApplicationPid().toString()
val originPid = unitOfWork?.message?.metaData?.get(METADATA_KEY_PROCESS_ID)
if(currentPid == originPid){
interceptorChain?.proceed()
}
}
}
在 GitHub
上查看完整的演示项目