Axonframework,如何将 MessageDispatchInterceptor 与反应性存储库一起使用

Axonframework, how to use MessageDispatchInterceptor with reactive repository

我已经阅读了基于集合的 consistency validation blog 并且我想通过调度拦截器进行验证。我按照示例进行操作,但我使用的是反应式存储库,它对我来说并不适用。我试过阻止和不阻止。使用块它会抛出错误,但如果没有块它不会执行任何操作。这是我的代码。

class SubnetCommandInterceptor : MessageDispatchInterceptor<CommandMessage<*>> {

  @Autowired
  private lateinit var privateNetworkRepository: PrivateNetworkRepository

  override fun handle(messages: List<CommandMessage<*>?>): BiFunction<Int, CommandMessage<*>, CommandMessage<*>> {
    return BiFunction<Int, CommandMessage<*>, CommandMessage<*>> { index: Int?, command: CommandMessage<*> ->
      if (CreateSubnetCommand::class.simpleName == (command.payloadType.simpleName)){
        val interceptCommand = command.payload as CreateSubnetCommand
        privateNetworkRepository
          .findById(interceptCommand.privateNetworkId)
          // ..some validation logic here ex.
          // .filter { network -> network.isSubnetOverlap() }
          .switchIfEmpty(Mono.error(IllegalArgumentException("Requested subnet is overlap with the previous subnet.")))
          // .block() also doesn't work here it throws error
         // block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-
      }
      command
    }
  }
}

不真正推荐在消息调度程序中订阅反应式存储库,这可能会导致奇怪的行为作为底层 ThreadLocal(由 Axox 使用)is not adapted to be used in reactive programing

相反,请查看 Axon's Reactive Extension 和反应式拦截器部分。

例如你可能会做什么:

reactiveCommandGateway.registerDispatchInterceptor(
        cmdMono -> cmdMono.flatMap(cmd->privateNetworkRepository
      .findById(cmd.privateNetworkId))
.switchIfEmpty(
Mono.error(IllegalArgumentException("Requested subnet is overlap with the previous subnet."))
.then(cmdMono)));