RxJava 异步观察器遗漏元素

RxJava asynchronous observer misses elements

我正在使用 observeOn 来观察另一个线程中的可观察对象:

Observable.just("Hello", "world!").observeOn(Schedulers.io()).subscribe(System.out::println);

但是这段代码不会总是输出"Hello world!"。 PublishSubject 也会发生同样的情况:

PublishSubject<String> subject = PublishSubject.create();
subject.observeOn(Schedulers.io()).subscribe(System.out::println);
subject.onNext("Hello");
subject.onNext("world!");

为什么这段代码并不总是打印 "Hello world!"?我认为至少在第二个示例中订阅会收到两条消息,因为它在 onNext 调用之前订阅。有没有办法同时接收这两条消息?

IO 调度程序需要一些时间才能准备就绪。

当您使用 onNext 发出项目时,它还没有准备好,所以您错过了。

RxJava 调度程序的底层线程都是守护线程,如果所有其他非守护线程都已完成,它们将被 JVM 停止。如果您 运行 来自 static void main() 的示例,您的 main() 方法很可能会在其他线程有机会 运行ning 之前终止,因此打印代码不会已执行。

根据您想要观察序列的方式,您可以在 Observable 链上使用 toBlocking() 或在 onCompleted() 方法实现中使用 CountDownLatchcountDown() .