RxJava:subscribeOn 和 observeOn 没有按预期工作

RxJava: subscribeOn and observeOn not working as expected

也许我只是很了解subscribeOnobserveOn的内部工作原理,但我最近遇到了一些非常奇怪的事情。我的印象是,subscribeOn 确定调度程序最初从哪里开始处理(特别是当我们,例如,有很多 maps 改变数据流)然后 observeOn 可以在那些 maps 之间的任何地方使用,以在适当的时候更改调度程序(首先进行网络,然后计算,最后更改 UI 线程)。

但是,我注意到当不直接将这些调用链接到我的 Observable 或 Single 时,它​​不会起作用。这是一个最小的工作示例 JUnit 测试:

import org.junit.Test;
import rx.Single;
import rx.schedulers.Schedulers;

public class SubscribeOnTest {

  @Test public void not_working_as_expected() throws Exception {
    Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
      System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
      int i = 1;
      singleSubscriber.onSuccess(i);
    });
    single.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());

    single.subscribe(integer -> {
      System.out.println("Observing on thread " + Thread.currentThread().getName());
    });
    System.out.println("Doing test on thread " + Thread.currentThread().getName());
    Thread.sleep(1000);
  }

  @Test public void working_as_expected() throws Exception {
    Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
      System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
      int i = 1;
      singleSubscriber.onSuccess(i);
    }).subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());

    single.subscribe(integer -> {
      System.out.println("Observing on thread " + Thread.currentThread().getName());
    });
    System.out.println("Doing test on thread " + Thread.currentThread().getName());
    Thread.sleep(1000);
  }
}

测试not_working_as_expected() 给出了以下输出

Doing some computation on thread main
Observing on thread main
Doing test on thread main

working_as_expected() 给了我

Doing some computation on thread RxComputationScheduler-1
Doing test on thread main
Observing on thread RxIoScheduler-2

唯一的区别是在第一个测试中,在创建单例之后有一个分号,然后才应用调度程序,在工作示例中,方法调用直接链接到单例的创建.但这不应该无关紧要吗?

运算符执行的所有 "modifications" 都是 不可变的,这意味着它们 return 接收到的新流以不同于前一个的方式通知。由于您刚刚调用了 subscribeOnobserveOn 运算符并且没有存储它们的结果,因此稍后进行的订阅在未更改的流中。

旁注: 我不太理解您对 subscribeOn 行为的定义。如果您的意思是地图操作员会以某种方式受到它的影响,那是不正确的。 subscribeOn定义了一个调度器,在其上调用了OnSubscribe函数。在您的情况下,您传递给 create() 方法的函数。另一方面,observeOn 定义了每个连续流(由应用运算符 return 编辑的流)正在处理来自上游的排放的调度程序。

.subscribeOn(*) - returns 你是 Observable 的新实例,但在第一次测试中你只是忽略它然后订阅原始 Observable,显然默认情况下订阅默认情况下,主线程。