如何在重试之前在 RxJava2 (RxJava3) 链中使超时运算符触发一次异常

How to make timeout operator fire exception one time in RxJava2 (RxJava3) chain before retry

我正在尝试扩展我的自定义 FlowableTransformer 的功能,我用它有条件地从一个 Flowable 切换到另一个,由提供的 Function<T, Flowable<R>> 返回,即在我的 [=15] =] 如果初始项目符合某些条件,我将项目发送到服务器并切换到可流动的服务器响应:

/**
 * Switches from upstreamFlowable to anotherFlowable
 * using switchFunction if item from upstreamFlowable
 * matches test condition
 *
 * usage:
 *       upstreamFlowable
 *       .compose(RxSwitchFlowablesOnCondition(condition, switchFunction))
 **/
open class
RxSwitchFlowablesOnCondition<T, R>(private val condition : Predicate<T>,
                                   private val switchFunction : Function<T, Flowable<R>>) : FlowableTransformer<T, R>
{
  override fun
  apply(upstreamFlowable : Flowable<T>) : Publisher<R>
  {
    return upstreamFlowable
    .compose(RxFlowableOnIo<T>())
    .filter {emittedItem : T ->
      condition.test(emittedItem)
    }
    .switchMap {emittedItem : T ->
      val anotherFlowable = switchFunction.apply(emittedItem)

      anotherFlowable
      //.timeout(CONNECTION_TIMEOUT_SECONDS, SECONDS) // #1
    }
    //.timeout(CONNECTION_TIMEOUT_SECONDS, SECONDS) // #2
  }
}

要在发生错误时重新启动链,在我的 android 应用程序中,我使用 retry 运算符:

upstreamFlowable
.compose(RxSwitchFlowablesOnCondition(condition, switchFunction))
.retry() // to resubscribe on error
.subscribe(...)

我想做的是添加timeout逻辑,例如,如果upstreamFlowable发出一个项目和在anotherFlowable获得响应之间的时间大于CONNECTION_TIMEOUT_SECONDS,我想触发错误并重新订阅 upstreamFlowable,这样我就可以等待下一个 emittedItem

我有:

  1. 如果我取消对行 #1 的注释,则超时会产生一个 TimeoutException,重试重新订阅链,我会继续每隔 CONNECTION_TIMEOUT_SECONDS 时间段收到超时异常,即使没有下一个发出的项目. 如何跳过它们?
  2. 如果我取消注释行 #2,那么我在 CONNECTION_TIMEOUT_SECONDS 订阅后 TimeoutException 秒后开始接收,而不是自项目发布后。 如何解决?

需要你的帮助。提前致谢!

要获得您想要的行为,您只需要在 switchMap 块中进行小的更改。所以,我将你的 apply 实现修改成了这个

override fun
  apply(upstreamFlowable : Flowable<T>) : Publisher<R>
  {
    return upstreamFlowable
    .compose(RxFlowableOnIo<T>())
    .filter {emittedItem : T ->
      condition.test(emittedItem)
    }
    .switchMap {
        Flowable.just(it)
            .switchMap { emittedItem : T ->
                val anotherFlowable = switchFunction(emittedItem)

                 anotherFlowable
              }
             .timeout(CONNECTION_TIMEOUT_SECONDS, SECONDS)
         }
  }

通过这样做,timeout 将在发出值时开始计数。

更新 为了进行测试,我准备了简单的代码并添加了日志:

  1. upstream 将在 2 秒后发射一次
  2. anotherFlowable 将在 1 秒后发射一次
  3. 超时为 500 毫秒

// 示例用法

val upstream = Flowable.timer(2, TimeUnit.SECONDS)
        upstream
                .doOnSubscribe { println("subscribed ${System.currentTimeMillis()}") }
                .compose(
                        RxSwitchFlowablesOnCondition<Long, Long>(
                                Predicate { true },
                                { Flowable.timer(1, TimeUnit.SECONDS)})
                )
                .doOnError { println("timeout ${System.currentTimeMillis()}") }
                .retry()
                .subscribe()

// RxSwitchFlowablesOnCondition

open class
    RxSwitchFlowablesOnCondition<T, R>(private val condition : Predicate<T>,
                                       private val switchFunction : (T) -> Flowable<R>) : FlowableTransformer<T, R>
    {
        override fun
                apply(upstreamFlowable : Flowable<T>) : Publisher<R>
        {
            return upstreamFlowable
                    .observeOn(Schedulers.io())
                    .filter {emittedItem : T ->
                        condition.test(emittedItem)
                    }
                    .switchMap {
                        Flowable.just(it)
                                .doOnNext { println("onNext ${System.currentTimeMillis()}") }
                                .switchMap { emittedItem : T ->
                                    val anotherFlowable = switchFunction(emittedItem)

                                    anotherFlowable
                                }
                                .timeout(500, TimeUnit.MILLISECONDS)
                    }

        }
    }

这是示例输出日志

subscribed 1569286225769
onNext 1569286227790
timeout 1569286228292  // around 500 ms after onNext
subscribed 1569286228295
onNext 1569286230297
timeout 1569286230800
subscribed 1569286230800
onNext 1569286232803
timeout 1569286233306
subscribed 1569286233306

是否符合您的预期?