RxJava:调度程序使用比预期更多的线程

RxJava: Scheduler using more threads than expected

我有以下代码:

ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);
ExecutorService poolC = newFixedThreadPool(10, threadFactory("Sched-C-%d"));
Scheduler schedulerC = Schedulers.from(poolC);
private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
            .setNameFormat(pattern).build();
}
@Test
public void testSubscribedOnObservedOn() {
        log("Starting");
        final Observable<String> obs = simple();
        log("Created");
        obs
                .doOnNext(x -> log("Found 1: " + x))
                .observeOn(schedulerB)
                .doOnNext(x -> {Thread.sleep(100);log("Found 2: " + x);})
                .observeOn(schedulerC)
                .doOnNext(x -> log("Found 3: " + x))
                .subscribeOn(schedulerA)
                .subscribe(
                        x -> log("Got 1: " + x),
                        Throwable::printStackTrace,
                        () -> log("Completed")
                );
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log("Exiting");
    }

最后 2 个运算符将在 schedulerC 上 运行。我希望只有一个线程用于此。但输出显示 2.

0   | main  | Starting
72  | main  | Created
135 | Sched-A-0 | Subscribed
136 | Sched-A-0 | Found 1: A
138 | Sched-A-0 | Found 1: B
239 | Sched-B-0 | Found 2: A
239 | Sched-C-0 | Found 3: A
240 | Sched-C-0 | Got 1: A
341 | Sched-B-0 | Found 2: B
341 | Sched-C-1 | Found 3: B
341 | Sched-C-1 | Got 1: B
341 | Sched-C-1 | Completed
3129    | main  | Exiting

使用了 Sched-C-0、Sched-C-1。这种行为是否正确?

使用 Executors.newFixedThreadPool(),您将获得一个可以对任务提交做出反应的线程池。在当前 运行 线程完成之前,第二个线程可能会更快地唤醒以服务更多工作。无法强制池重用其集合中的同一线程。

相比之下,RxJava 的标准 Schedulers 使用单线程工作者,因此相同的底层线程将为 observeOn 中的操作提供服务。

当包装任意 Executor 时,RxJava 能做的最好的事情就是确保提交给 Scheduler.Worker 的任务不会重叠。