Rx Java mergeDelayError 未按预期工作

Rx Java mergeDelayError not working as expected

我在 Android 应用程序中使用 RxJava Android。我正在使用 mergeDelayError 将两个 retro fit 网络调用组合成一个可观察对象,如果其中一个发出一个,它将处理发出的项目,如果有一个,则处理错误。这是行不通的,它只会在遇到错误时触发 onError 操作。现在为了测试这个,我转向了一个非常简单的例子,当我有一个 onError 调用时,仍然不会调用 successAction 。请参阅下面的示例。

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
            )
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .finallyDo(completeAction)
            .subscribe(successAction, errorAction);

仅当我使用两个成功可观察对象时才会调用成功操作。我是否遗漏了一些关于 mergeDelayError 应该如何工作的东西?

编辑:

我发现,如果删除 observeOnsubscribeOn,一切都会按预期进行。我需要指定线程并认为这是使用 Rx 的全部意义所在。知道为什么指定那些 Schedulers 会破坏行为吗?

这看起来仍然像是 mergeDelayError 运算符中的一个错误,但我能够通过为每个可观察对象复制 observerOn 和 Subscribe on 来让它工作。

Observable.mergeDelayError(
            Observable.error(new RuntimeException())
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io()),
            Observable.just("Hello")
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io())
        )
        .finallyDo(completeAction)
        .subscribe(successAction, errorAction);

我认为您不会等待终端事件并且主线程在事件传递给您的观察者之前退出。以下测试通过了 RxJava 1.0.14:

@Test
public void errorDelayed() {
    TestSubscriber<Object> ts = TestSubscriber.create();
    Observable.mergeDelayError(
            Observable.error(new RuntimeException()),
            Observable.just("Hello")
        )
        .subscribeOn(Schedulers.io()).subscribe(ts);

    ts.awaitTerminalEvent();

    ts.assertError(RuntimeException.class);
    ts.assertValue("Hello");
}

使用 .observeOn(AndroidSchedulers.mainThread(), true) 而不是 .observeOn(AndroidSchedulers.mainThread()

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
        return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
    }

以上是observeOn函数的签名。以下代码有效。

  Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
        )
                .observeOn(AndroidSchedulers.mainThread(), true)
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {

                    }
                });

从 ConcatDelayError 线程获得这个技巧:https://github.com/ReactiveX/RxJava/issues/3908#issuecomment-217999009