Rxjava observable somtimes 错过一些项目时在不同的线程中 emtted
Rxjava observable somtimes miss some item when the items are emtted in different threads
使用“observeOn(Scheduler)”,Rxjava observable 似乎有时会在项目从不同线程发出时遗漏一些项目。
这是代码
val subject = PublishSubject.create<String>()
subject
.observeOn(Schedulers.io())
.subscribe { x: String? -> println(x) }
val t1 = Thread { subject.onNext("1") }
val t2 = Thread { subject.onNext("2") }
t1.start()
t2.start()
我期望的是控制台必须始终像下面这样打印
1
2
但有时结果只有
1
或
2
有什么原因吗?
此外)如果我删除“observeOn(Schedulers.io())”,结果与我的预期相同。
PublishSubject 的 onXXX
方法不是线程安全的,您必须以某种方式序列化对它的访问。最简单的方法是应用 toSerialized()
:
Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).
val subject = PublishSubject.create<String>().toSerialized()
使用“observeOn(Scheduler)”,Rxjava observable 似乎有时会在项目从不同线程发出时遗漏一些项目。
这是代码
val subject = PublishSubject.create<String>()
subject
.observeOn(Schedulers.io())
.subscribe { x: String? -> println(x) }
val t1 = Thread { subject.onNext("1") }
val t2 = Thread { subject.onNext("2") }
t1.start()
t2.start()
我期望的是控制台必须始终像下面这样打印
1
2
但有时结果只有
1
或
2
有什么原因吗?
此外)如果我删除“observeOn(Schedulers.io())”,结果与我的预期相同。
PublishSubject 的 onXXX
方法不是线程安全的,您必须以某种方式序列化对它的访问。最简单的方法是应用 toSerialized()
:
Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).
val subject = PublishSubject.create<String>().toSerialized()