在 RxJava 映射中阻塞无法完成

Blocking inside RxJava map fails to complete

以下(使用 RxJava 1.2.4 的错误方式)代码无法解锁并且永远不会完成。

Scheduler scheduler = Schedulers.computation();
Observable.range(0, 100).map(i -> {
    System.out.println("onNext " + i);
    return Observable.just(i).subscribeOn(scheduler).toBlocking().single();
}).subscribeOn(scheduler).toBlocking().subscribe();
System.out.println("finished");

如果将第一行更改为固定线程池,它就会结束。

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(8));

导致第一个示例无法运行的计算调度程序有何特殊之处?

不要那样做

请注意有关计算调度程序的文档说明:

This can be used for event-loops, processing callbacks and other computational work. Do not perform IO-bound work on this scheduler.

他们想说:不要在此调度程序上执行任何阻塞活动

所以你的做法是违法的,但也算是一个很好的示范。

为什么会发生死锁

同样的死锁发生在RxJava 2(此时是2.0.4)

它的发生是由于 computation 调度程序的实现方式。它创建固定数量的 single-thread 工人(这些工人的数量是 CPU 核心的数量;在我的例子中是 4 个)。它给这些工人分配任务的方式很简单round-robin。现在让我们看看在您的示例中将哪些任务分配给了哪些工作人员。

  • worker 1 <- subscribe() 调用调用循环在 range 中生成整数;请注意,直到所有值都通过下游才能完成此任务
  • worker 2 <- just(0)...toBlocking().single() 为第一个生成的整数;这个立即完成,没有真正的阻塞,因为值已经可用
  • worker 3 <- just(1)...toBlocking().single() 用于第二个生成的整数;这个立即完成
  • worker 4 <- just(2)...toBlocking().single() 为第三个生成的整数;这个立即完成

此时我们有 worker 1 仍在忙于执行 range 循环,worker 2-4 空闲。下一个任务来自循环,根据 round-robin:

分配给 worker1
  • worker 1 <- just(3)...toBlocking().single() 为第四个生成的整数;这个会排队,而 worker 1 循环卡住等待结果。这里是死锁。

FixedThreadPool 调度程序不会锁定,因为它将任务分配给可用线程,而不是 round-robin 方式。只要确保它有 1 个以上的线程。

阻塞是邪恶的

通常您应该避免在 Rx 管道中进行阻塞操作。 Rx 提供了很好的工具来执行异步任务而不阻塞。您可以使用 flatMap 而不是 map,例如:

Scheduler scheduler = Schedulers.computation();
Observable.range(0, 100).flatMap(i -> {
    System.out.println("onNext " + i);
    return Observable.just(i).subscribeOn(scheduler);
}).subscribeOn(scheduler).toBlocking().subscribe();
System.out.println("finished");

这甚至可以使用单线程调度程序。

您可以像这样 Observable.fromFuture(asyncService(i)).

调用真正的异步任务,而不是 Observable.just(i)

如果有必要,请使用 concatMap 而不是 flatMap 来保留项目的顺序。