来自不同线程的重试时间

retryWhen from different threads

我对 RxJava 有一个不太理解的情况。我有这个可观察的设置,效果很好。

Observable.create(subscriber -> makeWebCall())
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidScheduler.newThread())
    .retryWhen(observable ->
                observable.flatMap(throwable -> {
                    if (throwable instanceof UserRecoverableException) {
                        return Observable.from(someUIFlowObservable());
                    }
                    return Observable.error(throwable);
                }))
    .subscribe(response -> response.doSomething(), t -> throwError(t));

所以我进行了一次网络调用,如果出现可恢复的异常,我在 UI 线程中进行了一些更改后重试了该调用。

但是,当然,这一切都是由于某些 UI 交互(即单击按钮)而发生的。那里还有一个可观察的链,所以我修改了那个链以也包括这个调用,就像这样:

Rx.click(someButton)
    .switchMap(o -> getSomeStringPreferenceObservable())
    .subscribeOn(AndroidSchedulers.mainThread())
    .observeOn(Schedulers.newThread())
    .switchMap(str -> Observable.create(subscriber - >makeWebCall()))
    .observeOn(AndroidScheduler.mainThread())
    .retryWhen(observable ->
                observable.flatMap(throwable -> {
                    if (throwable instanceof UserRecoverableException) {
                        return Observable.from(someUIFlowObservable());
                    }
                    return Observable.error(throwable);
                }))
    .subscribe(response -> response.doSomething(), t -> throwError(t));

现在,retryWhen 不起作用。很明显,两者之间的最大区别在于 subscribeOn 方法。但是我不确定我是否了解正在发生的事情以及我可以做些什么来获得预期的结果。

我认为它有效,您只需再次单击该按钮即可。

原因是 retryWhen 重新订阅了整个链,但是 Rx.click(someButton) 没有发出任何新的东西(在重新订阅之后)所以其余的将不会被处理。

如果 UserRecoverableExceptionmakeWebCall() 发出,则将重试移动到该可观察对象中:

.switchMap(str -> Observable.create(subscriber - > makeWebCall())
    .observeOn(Schedulers.newThread())
    .retryWhen(observable -> observable.flatMap(throwable -> {
      if (throwable instanceof UserRecoverableException) {
        return Observable.from(someUIFlowObservable())
            .subscribeOn(mainThread());
      }
      return Observable.error(throwable);
    }))
)

Ps: 在较新的 RxAndroid 中没有 AndroidScheduler.newThread() :)

Ps 2:如果您 运行 来自 Activity 或 Fragment 的可观察对象,则无需调用 .subscribeOn(AndroidSchedulers.mainThread())