RxJava2 BehaviorSubject 上的多个侦听器结果不一致
Multiple Listeners on RxJava2 BehaviorSubject with Inconsistent Results
我有一个 BehaviorSubject,它具有三个在任何发射之前订阅的侦听器。我.onNext()
两件事:A其次是B。
两个听众适当地接收到 A,然后接收到 B。但是第三个听众接收到 B、A。有什么可能解释这种行为?这都在同一个线程上。
这是一些重现结果的示例代码(在 Kotlin 中)。如果您需要 Java 版本,请告诉我:
@Test
fun `rxjava test`() {
val eventHistory1 = ArrayList<String>()
val eventHistory2 = ArrayList<String>()
val eventHistory3 = ArrayList<String>()
val behaviorSubject = BehaviorSubject.create<String>()
behaviorSubject.subscribe {
eventHistory1.add(it)
}
behaviorSubject.subscribe {
eventHistory2.add(it)
if (it == "A") behaviorSubject.onNext("B")
}
behaviorSubject.subscribe {
eventHistory3.add(it)
}
behaviorSubject.onNext("A")
println(eventHistory1)
println(eventHistory2)
println(eventHistory3)
assert(eventHistory1 == eventHistory2)
assert(eventHistory2 == eventHistory3)
}
这是测试的输出:
[A, B]
[A, B]
[B, A]
主题不可重入,因此在当前服务于 onNexts 的同一主题上调用 onNext 是未定义的行为。 javadoc 警告这种情况:
Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).
在您的特定情况下,第 3 个观察者首先发出“B”信号,而它正要向它发出“A”信号,因此交换了顺序。
在主题上使用 toSerialized
以确保不会发生这种情况。
val behaviorSubject = BehaviorSubject.create<String>().toSerialized()
我有一个 BehaviorSubject,它具有三个在任何发射之前订阅的侦听器。我.onNext()
两件事:A其次是B。
两个听众适当地接收到 A,然后接收到 B。但是第三个听众接收到 B、A。有什么可能解释这种行为?这都在同一个线程上。
这是一些重现结果的示例代码(在 Kotlin 中)。如果您需要 Java 版本,请告诉我:
@Test
fun `rxjava test`() {
val eventHistory1 = ArrayList<String>()
val eventHistory2 = ArrayList<String>()
val eventHistory3 = ArrayList<String>()
val behaviorSubject = BehaviorSubject.create<String>()
behaviorSubject.subscribe {
eventHistory1.add(it)
}
behaviorSubject.subscribe {
eventHistory2.add(it)
if (it == "A") behaviorSubject.onNext("B")
}
behaviorSubject.subscribe {
eventHistory3.add(it)
}
behaviorSubject.onNext("A")
println(eventHistory1)
println(eventHistory2)
println(eventHistory3)
assert(eventHistory1 == eventHistory2)
assert(eventHistory2 == eventHistory3)
}
这是测试的输出:
[A, B]
[A, B]
[B, A]
主题不可重入,因此在当前服务于 onNexts 的同一主题上调用 onNext 是未定义的行为。 javadoc 警告这种情况:
Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).
在您的特定情况下,第 3 个观察者首先发出“B”信号,而它正要向它发出“A”信号,因此交换了顺序。
在主题上使用 toSerialized
以确保不会发生这种情况。
val behaviorSubject = BehaviorSubject.create<String>().toSerialized()