Axon Framework:仅处理同一 JVM 实例发布的事件?

Axon Framework: Handle only events published by the same JVM instance?

嗨 Axon 框架社区,

我想听听您对如何正确解决以下问题的看法。

我的 Axon 测试设置

我想达到的目标

我希望一个实例发布的事件只由同一个实例处理

If instance1 publishes eventX then only instance1 should handle eventX

到目前为止我尝试了什么

class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {

    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
        val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
        if(stackId == stackProperties.id){
            interceptorChain?.proceed()
        }
        return null
    }
}
@Configuration
class AxonConfiguration {

    @Autowired
    fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
        val processingGroup = "processing-group-stack-${stackProperties.id}"
        eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
        eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
    }
}

有更好的解决方案吗?

我的印象是我当前的解决方案并不是最好的解决方案,因为理想情况下我希望只有属于特定实例的事件处理程序才由 TrackingEventProcessor 实例触发。

你会如何解决这个问题?

你在@thowimmer 遇到的场景很有趣。 我的第一个直觉是“改用 SubscribingEventProcessor”。 但是,您指出这不是您的设置中的一个选项。 我认为对于处于相同场景中的其他人来说,了解为什么这不是一种选择是 非常 有价值的。所以,也许你可以详细说明一下(老实说,我也很好奇)。

现在针对您的问题案例,确保事件仅在同一 JVM 中处理。 将原点添加到事件绝对是您可以采取的 a 步骤,因为这允许一种合乎逻辑的过滤方式。 “此事件是否源自 my.origin()?”如果没有,您只需忽略该事件并完成它,就这么简单。不过还有另一种方法可以实现这一点,稍后我会谈到。

不过,我认为过滤的地方正是您要寻找的地方。但首先,我想说明 为什么 您首先需要过滤。正如您所注意到的,TrackingEventProcessor (TEP) 从所谓的 StreamableMessageSource 流式传输事件。 EventStore 是这样一个 StreamableMessageSource 的实现。当您将所有事件存储在同一个存储中时,它只会将所有内容流式传输到您的 TEP。由于您的事件是单个事件流的一部分,因此您需要在某个阶段过滤它们。使用 MessageHandlerInterceptor 会起作用,您甚至可以编写 HandlerEnhacnerDefinition 允许您向事件处理函数添加额外的行为。不管你怎么说,在当前的设置下,过滤需要在某个地方完成。 MessageHandlerInterceptor 可以说是最简单的地方。

但是,有一种不同的处理方式。为什么不将您的事件存储隔离为两个应用程序的两个不同实例?显然他们不需要相互读取,那么为什么要共享同一个事件存储呢?在不了解您的域的更多背景的情况下,我猜您实际上是在处理驻留在不同 bounded contexts 中的应用程序。 非常 简而言之,与两者共享所有内容的兴趣为零 applications/contexts,您只需非常有意识地彼此共享领域语言的特定部分。

请注意,支持 multiple contexts,在中间使用单个通信集线器,正是Axon Server 可以为您实现的。我并不是说你不能自己配置它,我过去也这样做过。但是,将这项工作留给其他人或其他人,使您无需配置基础架构,这将大大节省时间。

希望这可以帮助您了解我对此事的看法@thowimmer。

总结:

如果我们想要使用 TrackingEventProcessor.

的功能,那么为两个实例使用相同的 EventStore 可能不是一个理想的设置

解决问题的选项:

  • 每个应用程序实例的专用(非镜像)数据库实例。
  • 使用 multiple contexts 使用 AxonServer。

如果我们决定使用 MessageHandlerInterceptor 解决应用程序级过滤的问题是最简单的解决方案。

感谢@Steven 交流想法。


编辑:

应用程序级别的解决方案使用 CorrelationDataProviderMessageHandlerInterceptor 通过过滤掉不是源自同一进程的事件。

AxonConfiguration.kt

const val METADATA_KEY_PROCESS_ID = "pid"
const val PROCESSING_GROUP_PREFIX = "processing-group-pid"

@Configuration
class AxonConfiguration {

    @Bean
    fun processIdCorrelationDataProvider() = ProcessIdCorrelationDataProvider()

    @Autowired
    fun configureProcessIdEventHandlerInterceptor(eventProcessingConfigurer: EventProcessingConfigurer) {
        val processingGroup = "$PROCESSING_GROUP_PREFIX-${ApplicationPid()}"
        eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
        eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { ProcessIdEventHandlerInterceptor() }
    }
}

class ProcessIdCorrelationDataProvider() : CorrelationDataProvider {
    override fun correlationDataFor(message: Message<*>?): MutableMap<String, *> {
        return mutableMapOf(METADATA_KEY_PROCESS_ID to ApplicationPid().toString())
    }
}

class ProcessIdEventHandlerInterceptor : MessageHandlerInterceptor<EventMessage<*>> {
    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?) {
        val currentPid = ApplicationPid().toString()
        val originPid = unitOfWork?.message?.metaData?.get(METADATA_KEY_PROCESS_ID)
        if(currentPid == originPid){
            interceptorChain?.proceed()
        }
    }
}

GitHub

上查看完整的演示项目