RxJava2 - 间隔和调度程序
RxJava2 - interval and schedulers
假设我有一个时间间隔,并且我已经给它一个 computationScheduler。像这样:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.flatMap { ... }
那么,在 flatmap {...} 中发生的一切是否也会被安排在一个计算线程上?
在 Observable.interval(long initialDelay、long period、TimeUnit unit、Scheduler 调度程序)的源代码中,它说:
* @param scheduler
* the Scheduler on which the waiting happens and items are emitted
作为 RxJava 的初学者,我很难理解这条评论。我知道间隔 timer/waiting 逻辑发生在计算线程上。但是,最后一部分,关于被发射的物品,是否也意味着被发射的物品将在同一个线程上被消耗?或者是否需要 observeOn?像这样:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.observeOn(computationScheduler)
.flatMap { ... }
如果我希望在计算线程上处理发射,是否需要 observeOn?
这个验证起来很简单:打印当前线程就可以看到operator是在哪个线程上执行的:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
这将始终打印:
on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9
按顺序处理,因为所有发生在一个线程中 -> main
。
observeOn
会改变下游执行线程:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.observeOn(Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.observeOn(Schedulers.io())
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
这次的结果每次执行都会不同,但是flatmap
和subscribe
会在不同的线程中处理:
on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1
interval
将充当observeOn
并更改下游执行线程(调度程序):
Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
这次在计算调度程序的一个线程内顺序执行:
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...
interval
将默认使用计算调度程序,您不需要将其作为参数传递并且不需要 observeOn
假设我有一个时间间隔,并且我已经给它一个 computationScheduler。像这样:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.flatMap { ... }
那么,在 flatmap {...} 中发生的一切是否也会被安排在一个计算线程上?
在 Observable.interval(long initialDelay、long period、TimeUnit unit、Scheduler 调度程序)的源代码中,它说:
* @param scheduler
* the Scheduler on which the waiting happens and items are emitted
作为 RxJava 的初学者,我很难理解这条评论。我知道间隔 timer/waiting 逻辑发生在计算线程上。但是,最后一部分,关于被发射的物品,是否也意味着被发射的物品将在同一个线程上被消耗?或者是否需要 observeOn?像这样:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.observeOn(computationScheduler)
.flatMap { ... }
如果我希望在计算线程上处理发射,是否需要 observeOn?
这个验证起来很简单:打印当前线程就可以看到operator是在哪个线程上执行的:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
这将始终打印:
on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9
按顺序处理,因为所有发生在一个线程中 -> main
。
observeOn
会改变下游执行线程:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.observeOn(Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.observeOn(Schedulers.io())
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
这次的结果每次执行都会不同,但是flatmap
和subscribe
会在不同的线程中处理:
on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1
interval
将充当observeOn
并更改下游执行线程(调度程序):
Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
这次在计算调度程序的一个线程内顺序执行:
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...
interval
将默认使用计算调度程序,您不需要将其作为参数传递并且不需要 observeOn