手动停止 takeUntil() 中的 Observable.timer()
Stop Observable.timer() inside takeUntil() manually
案例:在接下来的5秒内观察到一些物品,但我也想手动终止获取物品。
我有这个代码
private var disposable: Disposable? = null
...
observableThatHasToBeAliveAllTime
.switchMap {
observableThatEmitsItemOver5SecsWhenUpperObsEmits()
.takeUntil(
Observable.timer(5, TimeUnit.SECONDS)
.also { disposable = it.subscribe() }
)
.switchMap { /* some work */ }
}
.subscribe { /* handle result */ }
在其他地方我调用 disposable?.dispose
,但 takeUntil() 继续工作
我做错了什么?
What am I doing wrong?
it.subscribe()
订阅 timer
与 takeUntil
完全分开,没有任何效果。此外 takeUntil
需要来自其他来源的 onNext
或 onComplete
信号,因此 dispose 也不会按预期工作。
您可以使用一个单独的主题和另一个 takeUntil
来停止流程:
var stop = PublishSubject.create<Object>()
observableThatHasToBeAliveAllTime
.switchMap {
observableThatEmitsItemOver5SecsWhenUpperObsEmits()
.takeUntil(
Observable.timer(5, TimeUnit.SECONDS)
)
.takeUntil(stop)
.switchMap { /* some work */ }
}
.subscribe { /* handle result */ }
stop.onNext("Stop!");
案例:在接下来的5秒内观察到一些物品,但我也想手动终止获取物品。
我有这个代码
private var disposable: Disposable? = null
...
observableThatHasToBeAliveAllTime
.switchMap {
observableThatEmitsItemOver5SecsWhenUpperObsEmits()
.takeUntil(
Observable.timer(5, TimeUnit.SECONDS)
.also { disposable = it.subscribe() }
)
.switchMap { /* some work */ }
}
.subscribe { /* handle result */ }
在其他地方我调用 disposable?.dispose
,但 takeUntil() 继续工作
我做错了什么?
What am I doing wrong?
it.subscribe()
订阅 timer
与 takeUntil
完全分开,没有任何效果。此外 takeUntil
需要来自其他来源的 onNext
或 onComplete
信号,因此 dispose 也不会按预期工作。
您可以使用一个单独的主题和另一个 takeUntil
来停止流程:
var stop = PublishSubject.create<Object>()
observableThatHasToBeAliveAllTime
.switchMap {
observableThatEmitsItemOver5SecsWhenUpperObsEmits()
.takeUntil(
Observable.timer(5, TimeUnit.SECONDS)
)
.takeUntil(stop)
.switchMap { /* some work */ }
}
.subscribe { /* handle result */ }
stop.onNext("Stop!");