如何根据项目反应器中的当前可观察值动态选择转换函数?

How to dynamically chose a transform function based on current observable in project reactor?

你好亲爱的响应式程序员,我开始学习项目反应堆,但我仍然很难弄清楚什么时候使用什么运算符。我想通了,如果我想用可重用的部分来定义反应器流,我可以使用 transform 运算符。我想要实现的是基于当前的可观察上下文使用这种流函数的特定实现。对于 Mono 流程,我想到了这个,但我不确定它是否是一个好的解决方案:

所以这是流程的一部分

class CloudeventOverDelegatorRoute(
  val fromHttpToDelegatorRoute: FromHttpToDelegatorRoute,
  val delegatorProvider: DelegatorProvider,
  val fromDelegatorToHttpRoute: FromDelegatorToHttpRoute
): MessageRoute<HttpBaseMessage, HttpResponseMessage> {

  override fun isHandlerFor(context: RouteContext): Boolean {
    return fromHttpToDelegatorRoute.isHandlerFor(context)
      && fromDelegatorToHttpRoute.isHandlerFor(context)
  }

  override fun buildPipeline(input: Mono<RoutableMessage<HttpBaseMessage>>): Mono<RoutableMessage<HttpResponseMessage>> {
    var dynamicallyDeterminedDelegator: Delegator? = null
    return input.transform {
      fromHttpToDelegatorRoute.buildPipeline(input)
    }.handle<RoutableMessage<InternalMessage>> { t, u ->
      dynamicallyDeterminedDelegator = delegatorProvider.provideDelegatorFor(t.routeContext)
      u.next(t)
      u.complete()
    }.transform {
      dynamicallyDeterminedDelegator!!.sendDelegated(it)
    }.transform { fromDelegatorToHttpRoute.buildPipeline(it) }
  }

}

这里是动态选择逻辑

interface DelegatorProvider {

  fun provideDelegatorFor(context: RouteContext): Delegator

}

class FirstMatchDelegatorProvider(
  private val delegators: List<Delegator>
): DelegatorProvider {

  override fun provideDelegatorFor(context: RouteContext): Delegator {
    return delegators.firstOrNull {
      it.isHandlerFor(context)
    }?: throw IllegalStateException("No Delegator route available for context: $context")
  }

}

这是委托人提供整个流程的重要子部分

interface Delegator {

  fun isHandlerFor(context: RouteContext): Boolean

  fun sendDelegated(input: Mono<RoutableMessage<InternalMessage>>): Mono<RoutableMessage<InternalStatusMessage>>

}

你怎么看?你会怎么解决?

这种方法存在问题,因为它依赖于 共享状态dynamicallyDeterminedDelegator 变量)。如果多个订阅者订阅返回的 Mono,他们可以覆盖彼此的委托者。也许(多个订阅)不会在您的应用程序中发生,但无论如何这是一个非常糟糕的习惯。

看起来您可以从 RoutableMessage<InternalMessage> 派生出 delegator,而且您真的不需要保留该委托人。

一次性解析委托并将其应用于 routableMessage 的最简单方法就是使用 flatMap。请参阅下面的(伪)java 代码:

.flatMap(routableMessage -> {
    val delegator = delegatorProvider.provideDelegatorFor(routableMessage.routeContext);
    return delegator.sendDelegated(routableMessage);
})