Axonframework,如何将 MessageDispatchInterceptor 与反应性存储库一起使用
Axonframework, how to use MessageDispatchInterceptor with reactive repository
我已经阅读了基于集合的 consistency validation blog 并且我想通过调度拦截器进行验证。我按照示例进行操作,但我使用的是反应式存储库,它对我来说并不适用。我试过阻止和不阻止。使用块它会抛出错误,但如果没有块它不会执行任何操作。这是我的代码。
class SubnetCommandInterceptor : MessageDispatchInterceptor<CommandMessage<*>> {
@Autowired
private lateinit var privateNetworkRepository: PrivateNetworkRepository
override fun handle(messages: List<CommandMessage<*>?>): BiFunction<Int, CommandMessage<*>, CommandMessage<*>> {
return BiFunction<Int, CommandMessage<*>, CommandMessage<*>> { index: Int?, command: CommandMessage<*> ->
if (CreateSubnetCommand::class.simpleName == (command.payloadType.simpleName)){
val interceptCommand = command.payload as CreateSubnetCommand
privateNetworkRepository
.findById(interceptCommand.privateNetworkId)
// ..some validation logic here ex.
// .filter { network -> network.isSubnetOverlap() }
.switchIfEmpty(Mono.error(IllegalArgumentException("Requested subnet is overlap with the previous subnet.")))
// .block() also doesn't work here it throws error
// block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-
}
command
}
}
}
不真正推荐在消息调度程序中订阅反应式存储库,这可能会导致奇怪的行为作为底层 ThreadLocal(由 Axox 使用)is not adapted to be used in reactive programing
相反,请查看 Axon's Reactive Extension 和反应式拦截器部分。
例如你可能会做什么:
reactiveCommandGateway.registerDispatchInterceptor(
cmdMono -> cmdMono.flatMap(cmd->privateNetworkRepository
.findById(cmd.privateNetworkId))
.switchIfEmpty(
Mono.error(IllegalArgumentException("Requested subnet is overlap with the previous subnet."))
.then(cmdMono)));
我已经阅读了基于集合的 consistency validation blog 并且我想通过调度拦截器进行验证。我按照示例进行操作,但我使用的是反应式存储库,它对我来说并不适用。我试过阻止和不阻止。使用块它会抛出错误,但如果没有块它不会执行任何操作。这是我的代码。
class SubnetCommandInterceptor : MessageDispatchInterceptor<CommandMessage<*>> {
@Autowired
private lateinit var privateNetworkRepository: PrivateNetworkRepository
override fun handle(messages: List<CommandMessage<*>?>): BiFunction<Int, CommandMessage<*>, CommandMessage<*>> {
return BiFunction<Int, CommandMessage<*>, CommandMessage<*>> { index: Int?, command: CommandMessage<*> ->
if (CreateSubnetCommand::class.simpleName == (command.payloadType.simpleName)){
val interceptCommand = command.payload as CreateSubnetCommand
privateNetworkRepository
.findById(interceptCommand.privateNetworkId)
// ..some validation logic here ex.
// .filter { network -> network.isSubnetOverlap() }
.switchIfEmpty(Mono.error(IllegalArgumentException("Requested subnet is overlap with the previous subnet.")))
// .block() also doesn't work here it throws error
// block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-
}
command
}
}
}
不真正推荐在消息调度程序中订阅反应式存储库,这可能会导致奇怪的行为作为底层 ThreadLocal(由 Axox 使用)is not adapted to be used in reactive programing
相反,请查看 Axon's Reactive Extension 和反应式拦截器部分。
例如你可能会做什么:
reactiveCommandGateway.registerDispatchInterceptor(
cmdMono -> cmdMono.flatMap(cmd->privateNetworkRepository
.findById(cmd.privateNetworkId))
.switchIfEmpty(
Mono.error(IllegalArgumentException("Requested subnet is overlap with the previous subnet."))
.then(cmdMono)));