何时在 RxJava 中使用 blockingSubscribe

When to use blockingSubscribe in RxJava

在什么情况下需要在RxJava中使用blockingSubscribe?

像下面这样,可以打印出结果。

Observable.just(1, 2, 3, 4, 5)
            .subscribe(line -> System.out.println(line));

虽然如果改成下面这样,它不会打印结果,但为什么会这样呢?

Observable.interval(500, TimeUnit.MILLISECONDS)
            .subscribe(line -> System.out.println(line));

如果改为使用blockingSubscribe,它可以再次显示结果。

Observable.interval(500, TimeUnit.MILLISECONDS)
            .blockingSubscribe(line -> System.out.println(line));

我读到"blockingSubscribe"是阻塞主线程,但是为什么要阻塞主线程(就像在现实世界的情况下),如果不想阻塞主线程执行怎么办但仍想使用 Observable.interval ?

第一个示例 运行 在订阅线程的堆栈中。当您订阅时,调用线程会发生 subscribeAcutal。 just 操作员然后将 onNext 所有项目 (1,2,3,4,5) 同步给订阅者。在每个 onNext 上,将调用 Subscribe 并打印号码。

Observable.just(1, 2, 3, 4, 5)
            .subscribe(line -> System.out.println(line));

第二个例子不会打印任何东西,因为涉及到并发和你的main方法"falls through"。在幕后,Interval-Operator 使用调度程序。在订阅时,Interval 运算符将向调度程序添加一个作业,它将在 500 毫秒后 运行。调用线程实际调用订阅,操作员将作业添加到调度程序并完成。堆栈上没有要弹出的堆栈帧。因此 subscribe-Method 调用完成并且调用(主)线程能够继续。在你的情况下,没有更多的方法可以调用,因此程序退出。 programm 将比 emit 退出得更快,这将在 500 毫秒内发生。这就是为什么您在 System.out.

上看不到任何输出的原因
Observable.interval(500, TimeUnit.MILLISECONDS)
            .subscribe(line -> System.out.println(line));

如果你想在 singla 被打印出来之前阻止你的主线程退出,你可以添加一个

Thread.sleep(1_000)

主线程将在 Thread#sleep 处停止 1000 毫秒。当主线程被阻塞时,调度程序将在另一个线程上发出一个值,然后该线程将调用订阅者的 onNext。将调用 println 调用并打印值。

您在这里尝试实现的是同步。主线程与另一个。这是阻塞* - 订阅开始发挥作用。如果你想将一个线程与另一个线程连接起来,你必须阻塞线程直到特定信号到达。

what if doesn't want to block the main thread execution but still want to use Observable.interval

您的应用程序中执行的主线程不允许完成。

如何实现?

只需创建/启动一个非守护线程(例如 Android、SWT、JavaFX)^1

^1 Main method won't return