RxJava2 - 间隔和调度程序

RxJava2 - interval and schedulers

假设我有一个时间间隔,并且我已经给它一个 computationScheduler。像这样:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

那么,在 flatmap {...} 中发生的一切是否也会被安排在一个计算线程上?

在 Observable.interval(long initialDelay、long period、TimeUnit unit、Scheduler 调度程序)的源代码中,它说:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

作为 RxJava 的初学者,我很难理解这条评论。我知道间隔 timer/waiting 逻辑发生在计算线程上。但是,最后一部分,关于被发射的物品,是否也意味着被发射的物品将在同一个线程上被消耗?或者是否需要 observeOn?像这样:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

如果我希望在计算线程上处理发射,是否需要 observeOn?

这个验证起来很简单:打印当前线程就可以看到operator是在哪个线程上执行的:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这将始终打印:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

按顺序处理,因为所有发生在一个线程中 -> main

observeOn会改变下游执行线程:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

这次的结果每次执行都会不同,但是flatmapsubscribe会在不同的线程中处理:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

interval将充当observeOn并更改下游执行线程(调度程序):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这次在计算调度程序的一个线程内顺序执行:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

interval 将默认使用计算调度程序,您不需要将其作为参数传递并且不需要 observeOn