RxJava2 BehaviorSubject 上的多个侦听器结果不一致

Multiple Listeners on RxJava2 BehaviorSubject with Inconsistent Results

我有一个 BehaviorSubject,它具有三个在任何发射之前订阅的侦听器。我.onNext()两件事:A其次是B。

两个听众适当地接收到 A,然后接收到 B。但是第三个听众接收到 B、A。有什么可能解释这种行为?这都在同一个线程上。

这是一些重现结果的示例代码(在 Kotlin 中)。如果您需要 Java 版本,请告诉我:

    @Test
    fun `rxjava test`() {
        val eventHistory1 = ArrayList<String>()
        val eventHistory2 = ArrayList<String>()
        val eventHistory3 = ArrayList<String>()

        val behaviorSubject = BehaviorSubject.create<String>()

        behaviorSubject.subscribe {
            eventHistory1.add(it)
        }

        behaviorSubject.subscribe {
            eventHistory2.add(it)
            if (it == "A") behaviorSubject.onNext("B")
        }

        behaviorSubject.subscribe {
            eventHistory3.add(it)
        }

        behaviorSubject.onNext("A")

        println(eventHistory1)
        println(eventHistory2)
        println(eventHistory3)

        assert(eventHistory1 == eventHistory2)
        assert(eventHistory2 == eventHistory3)
    }


这是测试的输出:

[A, B]
[A, B]
[B, A]

主题不可重入,因此在当前服务于 onNexts 的同一主题上调用 onNext 是未定义的行为。 javadoc 警告这种情况:

Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).

在您的特定情况下,第 3 个观察者首先发出“B”信号,而它正要向它发出“A”信号,因此交换了顺序。

在主题上使用 toSerialized 以确保不会发生这种情况。

val behaviorSubject = BehaviorSubject.create<String>().toSerialized()