Observable.create() subscribeOn 和 observeOn 不工作

Observable.create() subscribeOn and observeOn not working

在我的Android应用程序中我使用了Rxjava2,但是出现了一些奇怪的情况。

在我的 Disposable 中,我打印以记录当前线程名称:

    //1
    Observable
            .create((ObservableOnSubscribe<UserModel>) e -> {
                //mock io
                if (phoneNumber.equals("HolyHigh") && password.equals("111111")) {
                    e.onNext(new UserModel());
                    e.onComplete();
                } else {
                    e.onError(new RuntimeException("Error."));
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .delay(1, TimeUnit.SECONDS)
            .subscribe(
                    r -> {
                        view.onLoginSuccess(new UserModel());
                        //test
                        String name = Thread.currentThread().getName();
                        Log.e("Thread Name", " Success Current Thread Name: " + name);
                    }
                    , e -> {
                        e.printStackTrace();
                        view.onLoginFailed(e.getMessage());
                        //test
                        String name = Thread.currentThread().getName();
                        Log.e("Thread Name", " Error Current Thread Name: " + name);
                    }
            );

然后登录:

Thread Name: Error Current Thread Name: RxComputationThreadPool-3

看起来 observeOn 和 subscribeOn 不起作用... 为什么不是主线程?

不过,我写了一些简单的...

//2
    Single.timer(1, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(r -> {
                Log.e("Single Thread Name", "Single Thread Name: " + Thread.currentThread().getName());
                CommonUtil.showToast(r + "~");
            });

这记录了:

Single Thread Name: Single Thread Name: main

我哪里错了?...

delay() 运算符默认在计算调度程序上运行,因此更改了上游事件以在计算线程上获得通知。

您只需在订阅之前更改它,只需将 observeOn .observeOn(AndroidSchedulers.mainThread()) 移动到 delay() 运算符之后。

顺便说一句,delay() 也有获取 Scheduler 参数的重载,可让您更改默认值 Scheduler