Rxjava observable somtimes 错过一些项目时在不同的线程中 emtted

Rxjava observable somtimes miss some item when the items are emtted in different threads

使用“observeOn(Scheduler)”,Rxjava observable 似乎有时会在项目从不同线程发出时遗漏一些项目。

这是代码

val subject = PublishSubject.create<String>()

subject
    .observeOn(Schedulers.io())
    .subscribe { x: String? -> println(x) }

val t1 = Thread { subject.onNext("1") }
val t2 = Thread { subject.onNext("2") }

t1.start()
t2.start()

我期望的是控制台必须始终像下面这样打印

1
2

但有时结果只有

1

2

有什么原因吗?

此外)如果我删除“observeOn(Schedulers.io())”,结果与我的预期相同。

PublishSubjectonXXX 方法不是线程安全的,您必须以某种方式序列化对它的访问。最简单的方法是应用 toSerialized():

Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).

val subject = PublishSubject.create<String>().toSerialized()