RxJava:subscribeOn 和 observeOn 没有按预期工作
RxJava: subscribeOn and observeOn not working as expected
也许我只是很了解subscribeOn
和observeOn
的内部工作原理,但我最近遇到了一些非常奇怪的事情。我的印象是,subscribeOn
确定调度程序最初从哪里开始处理(特别是当我们,例如,有很多 map
s 改变数据流)然后 observeOn
可以在那些 maps
之间的任何地方使用,以在适当的时候更改调度程序(首先进行网络,然后计算,最后更改 UI 线程)。
但是,我注意到当不直接将这些调用链接到我的 Observable 或 Single 时,它不会起作用。这是一个最小的工作示例 JUnit 测试:
import org.junit.Test;
import rx.Single;
import rx.schedulers.Schedulers;
public class SubscribeOnTest {
@Test public void not_working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
});
single.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());
single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}
@Test public void working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());
single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}
}
测试not_working_as_expected()
给出了以下输出
Doing some computation on thread main
Observing on thread main
Doing test on thread main
而 working_as_expected()
给了我
Doing some computation on thread RxComputationScheduler-1
Doing test on thread main
Observing on thread RxIoScheduler-2
唯一的区别是在第一个测试中,在创建单例之后有一个分号,然后才应用调度程序,在工作示例中,方法调用直接链接到单例的创建.但这不应该无关紧要吗?
运算符执行的所有 "modifications" 都是 不可变的,这意味着它们 return 接收到的新流以不同于前一个的方式通知。由于您刚刚调用了 subscribeOn
和 observeOn
运算符并且没有存储它们的结果,因此稍后进行的订阅在未更改的流中。
旁注: 我不太理解您对 subscribeOn
行为的定义。如果您的意思是地图操作员会以某种方式受到它的影响,那是不正确的。 subscribeOn
定义了一个调度器,在其上调用了OnSubscribe函数。在您的情况下,您传递给 create()
方法的函数。另一方面,observeOn
定义了每个连续流(由应用运算符 return 编辑的流)正在处理来自上游的排放的调度程序。
.subscribeOn(*)
- returns 你是 Observable
的新实例,但在第一次测试中你只是忽略它然后订阅原始 Observable
,显然默认情况下订阅默认情况下,主线程。
也许我只是很了解subscribeOn
和observeOn
的内部工作原理,但我最近遇到了一些非常奇怪的事情。我的印象是,subscribeOn
确定调度程序最初从哪里开始处理(特别是当我们,例如,有很多 map
s 改变数据流)然后 observeOn
可以在那些 maps
之间的任何地方使用,以在适当的时候更改调度程序(首先进行网络,然后计算,最后更改 UI 线程)。
但是,我注意到当不直接将这些调用链接到我的 Observable 或 Single 时,它不会起作用。这是一个最小的工作示例 JUnit 测试:
import org.junit.Test;
import rx.Single;
import rx.schedulers.Schedulers;
public class SubscribeOnTest {
@Test public void not_working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
});
single.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());
single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}
@Test public void working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());
single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}
}
测试not_working_as_expected()
给出了以下输出
Doing some computation on thread main
Observing on thread main
Doing test on thread main
而 working_as_expected()
给了我
Doing some computation on thread RxComputationScheduler-1
Doing test on thread main
Observing on thread RxIoScheduler-2
唯一的区别是在第一个测试中,在创建单例之后有一个分号,然后才应用调度程序,在工作示例中,方法调用直接链接到单例的创建.但这不应该无关紧要吗?
运算符执行的所有 "modifications" 都是 不可变的,这意味着它们 return 接收到的新流以不同于前一个的方式通知。由于您刚刚调用了 subscribeOn
和 observeOn
运算符并且没有存储它们的结果,因此稍后进行的订阅在未更改的流中。
旁注: 我不太理解您对 subscribeOn
行为的定义。如果您的意思是地图操作员会以某种方式受到它的影响,那是不正确的。 subscribeOn
定义了一个调度器,在其上调用了OnSubscribe函数。在您的情况下,您传递给 create()
方法的函数。另一方面,observeOn
定义了每个连续流(由应用运算符 return 编辑的流)正在处理来自上游的排放的调度程序。
.subscribeOn(*)
- returns 你是 Observable
的新实例,但在第一次测试中你只是忽略它然后订阅原始 Observable
,显然默认情况下订阅默认情况下,主线程。