switchOnNext 运算符在上次插入可观察对象后不会为订阅发出
switchOnNext operator does not emit for subscriptions after last inserted observable
首先,一些背景知识(也许有更好的方法):
我们有一个模块可以在特定的 Observable 上发出传入的蓝牙消息。然后我们处理这些消息,最后在最后订阅转发消息。这种处理可能会在某个时间发生变化,对于大多数处理来说,这意味着重新创建中间 Observable 以及依赖它的所有 observable(因为它们现在将处理无效数据)。
我们想对其进行更改,以便重新创建处理的某些部分不需要重新创建依赖于它的所有内容,主要是这样我们就不必始终记住什么依赖于什么,并且还可以让操作员具有内部状态(如缓冲区、扫描或去抖动)不要丢失此内部状态。
有前途的解决方案:
通过使用 switchOnNext 运算符,我们可以解决这个问题。每当重新创建中间 observable 时,我们只需将其添加到 switchOnNext 的原点,订阅 switchOnNext 输出的任何人都会立即获得新结果。
问题:
如果 switchOnNext 之后的处理必须改变,它将停止获取结果,直到之前的 observable 改变。这意味着我们现在遇到了相反的问题。每当某些部分发生变化时,我们必须递归地重新创建它所依赖的一切。这稍微好一点(跟踪依赖什么比跟踪依赖它的所有东西要容易得多),但是可观察对象仍然会丢失内部状态,因为它们必须重新创建。
这种行为似乎违背了文档所说的应该发生的事情,但它没有明确说明这种或另一种方式。
示例代码:
此代码演示了问题。
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
fun main() {
//Observable of observables
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
//Observable to subscribe to get the most recent values
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
//Now 1 is subscribed
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
//Now 1 and 2 are subscribed
publishSubject.onNext(obsAux1)
observable.subscribe { println("3: $it") }
//Now 1, 2 and 3 are subscribed
//Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
//Now 1, 2, 3 and 4 are subscribed
//Should not print anything
obsAux1.onNext(2)
//Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
obsAux2.onNext(3)
}
此代码的输出:
1: 1
2: 1
1: 3
2: 3
3: 3
预期输出:
1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing
obsAux1 第一次发出时,所有三个订阅都应该打印,但只有在添加到 publishSubject 之前的订阅才会打印出来。
obsAux1 第二次发出时,应该不会打印任何内容,因为 obsAux2 已经插入。这按预期工作
obsAux2 第一次发出时,所有四个订阅都应该打印。第三个订阅按预期打印,这应该表明订阅工作正常。但是第四个订阅没有打印任何内容,因为它是在将 obsAux2 插入到 publishSubject 之后添加的。
你应该像这样使用 switchOnNext way.In 他们提到的文档
{@code switchOnNext} subscribes to an ObservableSource that emits
ObservableSources. Each time it observes one of these emitted
ObservableSources, the ObservableSource returned by {@code
switchOnNext} begins emitting the items emitted by that
ObservableSource. When a new ObservableSource is emitted, {@code
switchOnNext} stops emitting items from the earlier-emitted
ObservableSource and begins emitting items from the new one.
以下代码将给出您预期的 output.Based 逻辑,您将按此顺序更改它..
fun main()
{
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
observable.subscribe { println("3: $it") }
publishSubject.onNext(obsAux1)
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
obsAux1.onNext(2)
publishSubject.onNext(obsAux2)
obsAux2.onNext(3)
}
解决方案只是使用 BehaviourSubject 而不是 PublishSubject,至少对于可观察对象的可观察对象。
两者之间的区别在于,在新订阅时,PublishSubject 只会发出更多元素,而 BehaviourSubject 会立即发出最后一个元素并正常继续。
我仍然不同意它应该如何工作,但无论如何它解决了我们的问题。
代码,以防万一有人需要它(只需更改 main 的第一行和一个额外的导入):
import io.reactivex.subjects.BehaviorSubject
import io.reactivex.subjects.PublishSubject
fun main() {
//Observable of observables
val publishSubject: BehaviorSubject<Observable<Int>> = BehaviorSubject.create()
//Observable to subscribe to get the most recent values
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
//Now 1 is subscribed
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
//Now 1 and 2 are subscribed
publishSubject.onNext(obsAux1)
observable.subscribe { println("3: $it") }
//Now 1, 2 and 3 are subscribed
//Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
//Now 1, 2, 3 and 4 are subscribed
//Should not print anything
obsAux1.onNext(2)
//Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
obsAux2.onNext(3)
}
首先,一些背景知识(也许有更好的方法):
我们有一个模块可以在特定的 Observable 上发出传入的蓝牙消息。然后我们处理这些消息,最后在最后订阅转发消息。这种处理可能会在某个时间发生变化,对于大多数处理来说,这意味着重新创建中间 Observable 以及依赖它的所有 observable(因为它们现在将处理无效数据)。
我们想对其进行更改,以便重新创建处理的某些部分不需要重新创建依赖于它的所有内容,主要是这样我们就不必始终记住什么依赖于什么,并且还可以让操作员具有内部状态(如缓冲区、扫描或去抖动)不要丢失此内部状态。
有前途的解决方案:
通过使用 switchOnNext 运算符,我们可以解决这个问题。每当重新创建中间 observable 时,我们只需将其添加到 switchOnNext 的原点,订阅 switchOnNext 输出的任何人都会立即获得新结果。
问题:
如果 switchOnNext 之后的处理必须改变,它将停止获取结果,直到之前的 observable 改变。这意味着我们现在遇到了相反的问题。每当某些部分发生变化时,我们必须递归地重新创建它所依赖的一切。这稍微好一点(跟踪依赖什么比跟踪依赖它的所有东西要容易得多),但是可观察对象仍然会丢失内部状态,因为它们必须重新创建。
这种行为似乎违背了文档所说的应该发生的事情,但它没有明确说明这种或另一种方式。
示例代码:
此代码演示了问题。
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
fun main() {
//Observable of observables
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
//Observable to subscribe to get the most recent values
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
//Now 1 is subscribed
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
//Now 1 and 2 are subscribed
publishSubject.onNext(obsAux1)
observable.subscribe { println("3: $it") }
//Now 1, 2 and 3 are subscribed
//Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
//Now 1, 2, 3 and 4 are subscribed
//Should not print anything
obsAux1.onNext(2)
//Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
obsAux2.onNext(3)
}
此代码的输出:
1: 1
2: 1
1: 3
2: 3
3: 3
预期输出:
1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing
obsAux1 第一次发出时,所有三个订阅都应该打印,但只有在添加到 publishSubject 之前的订阅才会打印出来。
obsAux1 第二次发出时,应该不会打印任何内容,因为 obsAux2 已经插入。这按预期工作
obsAux2 第一次发出时,所有四个订阅都应该打印。第三个订阅按预期打印,这应该表明订阅工作正常。但是第四个订阅没有打印任何内容,因为它是在将 obsAux2 插入到 publishSubject 之后添加的。
你应该像这样使用 switchOnNext way.In 他们提到的文档
{@code switchOnNext} subscribes to an ObservableSource that emits ObservableSources. Each time it observes one of these emitted ObservableSources, the ObservableSource returned by {@code switchOnNext} begins emitting the items emitted by that ObservableSource. When a new ObservableSource is emitted, {@code switchOnNext} stops emitting items from the earlier-emitted ObservableSource and begins emitting items from the new one.
以下代码将给出您预期的 output.Based 逻辑,您将按此顺序更改它..
fun main()
{
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
observable.subscribe { println("3: $it") }
publishSubject.onNext(obsAux1)
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
obsAux1.onNext(2)
publishSubject.onNext(obsAux2)
obsAux2.onNext(3)
}
解决方案只是使用 BehaviourSubject 而不是 PublishSubject,至少对于可观察对象的可观察对象。
两者之间的区别在于,在新订阅时,PublishSubject 只会发出更多元素,而 BehaviourSubject 会立即发出最后一个元素并正常继续。
我仍然不同意它应该如何工作,但无论如何它解决了我们的问题。
代码,以防万一有人需要它(只需更改 main 的第一行和一个额外的导入):
import io.reactivex.subjects.BehaviorSubject
import io.reactivex.subjects.PublishSubject
fun main() {
//Observable of observables
val publishSubject: BehaviorSubject<Observable<Int>> = BehaviorSubject.create()
//Observable to subscribe to get the most recent values
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
//Now 1 is subscribed
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
//Now 1 and 2 are subscribed
publishSubject.onNext(obsAux1)
observable.subscribe { println("3: $it") }
//Now 1, 2 and 3 are subscribed
//Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
//Now 1, 2, 3 and 4 are subscribed
//Should not print anything
obsAux1.onNext(2)
//Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
obsAux2.onNext(3)
}