RxJava:调度程序使用比预期更多的线程
RxJava: Scheduler using more threads than expected
我有以下代码:
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);
ExecutorService poolC = newFixedThreadPool(10, threadFactory("Sched-C-%d"));
Scheduler schedulerC = Schedulers.from(poolC);
private ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern).build();
}
@Test
public void testSubscribedOnObservedOn() {
log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
.doOnNext(x -> log("Found 1: " + x))
.observeOn(schedulerB)
.doOnNext(x -> {Thread.sleep(100);log("Found 2: " + x);})
.observeOn(schedulerC)
.doOnNext(x -> log("Found 3: " + x))
.subscribeOn(schedulerA)
.subscribe(
x -> log("Got 1: " + x),
Throwable::printStackTrace,
() -> log("Completed")
);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("Exiting");
}
最后 2 个运算符将在 schedulerC 上 运行。我希望只有一个线程用于此。但输出显示 2.
0 | main | Starting
72 | main | Created
135 | Sched-A-0 | Subscribed
136 | Sched-A-0 | Found 1: A
138 | Sched-A-0 | Found 1: B
239 | Sched-B-0 | Found 2: A
239 | Sched-C-0 | Found 3: A
240 | Sched-C-0 | Got 1: A
341 | Sched-B-0 | Found 2: B
341 | Sched-C-1 | Found 3: B
341 | Sched-C-1 | Got 1: B
341 | Sched-C-1 | Completed
3129 | main | Exiting
使用了 Sched-C-0、Sched-C-1。这种行为是否正确?
使用 Executors.newFixedThreadPool()
,您将获得一个可以对任务提交做出反应的线程池。在当前 运行 线程完成之前,第二个线程可能会更快地唤醒以服务更多工作。无法强制池重用其集合中的同一线程。
相比之下,RxJava 的标准 Scheduler
s 使用单线程工作者,因此相同的底层线程将为 observeOn
中的操作提供服务。
当包装任意 Executor
时,RxJava 能做的最好的事情就是确保提交给 Scheduler.Worker
的任务不会重叠。
我有以下代码:
ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
Scheduler schedulerA = Schedulers.from(poolA);
ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
Scheduler schedulerB = Schedulers.from(poolB);
ExecutorService poolC = newFixedThreadPool(10, threadFactory("Sched-C-%d"));
Scheduler schedulerC = Schedulers.from(poolC);
private ThreadFactory threadFactory(String pattern) {
return new ThreadFactoryBuilder()
.setNameFormat(pattern).build();
}
@Test
public void testSubscribedOnObservedOn() {
log("Starting");
final Observable<String> obs = simple();
log("Created");
obs
.doOnNext(x -> log("Found 1: " + x))
.observeOn(schedulerB)
.doOnNext(x -> {Thread.sleep(100);log("Found 2: " + x);})
.observeOn(schedulerC)
.doOnNext(x -> log("Found 3: " + x))
.subscribeOn(schedulerA)
.subscribe(
x -> log("Got 1: " + x),
Throwable::printStackTrace,
() -> log("Completed")
);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("Exiting");
}
最后 2 个运算符将在 schedulerC 上 运行。我希望只有一个线程用于此。但输出显示 2.
0 | main | Starting
72 | main | Created
135 | Sched-A-0 | Subscribed
136 | Sched-A-0 | Found 1: A
138 | Sched-A-0 | Found 1: B
239 | Sched-B-0 | Found 2: A
239 | Sched-C-0 | Found 3: A
240 | Sched-C-0 | Got 1: A
341 | Sched-B-0 | Found 2: B
341 | Sched-C-1 | Found 3: B
341 | Sched-C-1 | Got 1: B
341 | Sched-C-1 | Completed
3129 | main | Exiting
使用了 Sched-C-0、Sched-C-1。这种行为是否正确?
使用 Executors.newFixedThreadPool()
,您将获得一个可以对任务提交做出反应的线程池。在当前 运行 线程完成之前,第二个线程可能会更快地唤醒以服务更多工作。无法强制池重用其集合中的同一线程。
相比之下,RxJava 的标准 Scheduler
s 使用单线程工作者,因此相同的底层线程将为 observeOn
中的操作提供服务。
当包装任意 Executor
时,RxJava 能做的最好的事情就是确保提交给 Scheduler.Worker
的任务不会重叠。