在 RxJava 映射中阻塞无法完成
Blocking inside RxJava map fails to complete
以下(使用 RxJava 1.2.4 的错误方式)代码无法解锁并且永远不会完成。
Scheduler scheduler = Schedulers.computation();
Observable.range(0, 100).map(i -> {
System.out.println("onNext " + i);
return Observable.just(i).subscribeOn(scheduler).toBlocking().single();
}).subscribeOn(scheduler).toBlocking().subscribe();
System.out.println("finished");
如果将第一行更改为固定线程池,它就会结束。
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(8));
导致第一个示例无法运行的计算调度程序有何特殊之处?
不要那样做
请注意有关计算调度程序的文档说明:
This can be used for event-loops, processing callbacks and other computational work. Do not perform IO-bound work on this scheduler.
他们想说:不要在此调度程序上执行任何阻塞活动。
所以你的做法是违法的,但也算是一个很好的示范。
为什么会发生死锁
同样的死锁发生在RxJava 2(此时是2.0.4)
它的发生是由于 computation
调度程序的实现方式。它创建固定数量的 single-thread 工人(这些工人的数量是 CPU 核心的数量;在我的例子中是 4 个)。它给这些工人分配任务的方式很简单round-robin。现在让我们看看在您的示例中将哪些任务分配给了哪些工作人员。
- worker 1 <-
subscribe()
调用调用循环在 range
中生成整数;请注意,直到所有值都通过下游才能完成此任务
- worker 2 <-
just(0)...toBlocking().single()
为第一个生成的整数;这个立即完成,没有真正的阻塞,因为值已经可用
- worker 3 <-
just(1)...toBlocking().single()
用于第二个生成的整数;这个立即完成
- worker 4 <-
just(2)...toBlocking().single()
为第三个生成的整数;这个立即完成
此时我们有 worker 1 仍在忙于执行 range
循环,worker 2-4 空闲。下一个任务来自循环,根据 round-robin:
分配给 worker1
- worker 1 <-
just(3)...toBlocking().single()
为第四个生成的整数;这个会排队,而 worker 1 循环卡住等待结果。这里是死锁。
FixedThreadPool
调度程序不会锁定,因为它将任务分配给可用线程,而不是 round-robin 方式。只要确保它有 1 个以上的线程。
阻塞是邪恶的
通常您应该避免在 Rx 管道中进行阻塞操作。 Rx 提供了很好的工具来执行异步任务而不阻塞。您可以使用 flatMap
而不是 map
,例如:
Scheduler scheduler = Schedulers.computation();
Observable.range(0, 100).flatMap(i -> {
System.out.println("onNext " + i);
return Observable.just(i).subscribeOn(scheduler);
}).subscribeOn(scheduler).toBlocking().subscribe();
System.out.println("finished");
这甚至可以使用单线程调度程序。
您可以像这样 Observable.fromFuture(asyncService(i))
.
调用真正的异步任务,而不是 Observable.just(i)
如果有必要,请使用 concatMap
而不是 flatMap
来保留项目的顺序。
以下(使用 RxJava 1.2.4 的错误方式)代码无法解锁并且永远不会完成。
Scheduler scheduler = Schedulers.computation();
Observable.range(0, 100).map(i -> {
System.out.println("onNext " + i);
return Observable.just(i).subscribeOn(scheduler).toBlocking().single();
}).subscribeOn(scheduler).toBlocking().subscribe();
System.out.println("finished");
如果将第一行更改为固定线程池,它就会结束。
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(8));
导致第一个示例无法运行的计算调度程序有何特殊之处?
不要那样做
请注意有关计算调度程序的文档说明:
This can be used for event-loops, processing callbacks and other computational work. Do not perform IO-bound work on this scheduler.
他们想说:不要在此调度程序上执行任何阻塞活动。
所以你的做法是违法的,但也算是一个很好的示范。
为什么会发生死锁
同样的死锁发生在RxJava 2(此时是2.0.4)
它的发生是由于 computation
调度程序的实现方式。它创建固定数量的 single-thread 工人(这些工人的数量是 CPU 核心的数量;在我的例子中是 4 个)。它给这些工人分配任务的方式很简单round-robin。现在让我们看看在您的示例中将哪些任务分配给了哪些工作人员。
- worker 1 <-
subscribe()
调用调用循环在range
中生成整数;请注意,直到所有值都通过下游才能完成此任务 - worker 2 <-
just(0)...toBlocking().single()
为第一个生成的整数;这个立即完成,没有真正的阻塞,因为值已经可用 - worker 3 <-
just(1)...toBlocking().single()
用于第二个生成的整数;这个立即完成 - worker 4 <-
just(2)...toBlocking().single()
为第三个生成的整数;这个立即完成
此时我们有 worker 1 仍在忙于执行 range
循环,worker 2-4 空闲。下一个任务来自循环,根据 round-robin:
- worker 1 <-
just(3)...toBlocking().single()
为第四个生成的整数;这个会排队,而 worker 1 循环卡住等待结果。这里是死锁。
FixedThreadPool
调度程序不会锁定,因为它将任务分配给可用线程,而不是 round-robin 方式。只要确保它有 1 个以上的线程。
阻塞是邪恶的
通常您应该避免在 Rx 管道中进行阻塞操作。 Rx 提供了很好的工具来执行异步任务而不阻塞。您可以使用 flatMap
而不是 map
,例如:
Scheduler scheduler = Schedulers.computation();
Observable.range(0, 100).flatMap(i -> {
System.out.println("onNext " + i);
return Observable.just(i).subscribeOn(scheduler);
}).subscribeOn(scheduler).toBlocking().subscribe();
System.out.println("finished");
这甚至可以使用单线程调度程序。
您可以像这样 Observable.fromFuture(asyncService(i))
.
Observable.just(i)
如果有必要,请使用 concatMap
而不是 flatMap
来保留项目的顺序。