手动停止 takeUntil() 中的 Observable.timer()

Stop Observable.timer() inside takeUntil() manually

案例:在接下来的5秒内观察到一些物品,但我也想手动终止获取物品。

我有这个代码


private var disposable: Disposable? = null

...

observableThatHasToBeAliveAllTime
    .switchMap {
        observableThatEmitsItemOver5SecsWhenUpperObsEmits()
            .takeUntil(
                Observable.timer(5, TimeUnit.SECONDS)
                    .also { disposable = it.subscribe() }
            )
            .switchMap { /* some work */ }
    }
    .subscribe { /* handle result */ }

在其他地方我调用 disposable?.dispose,但 takeUntil() 继续工作

我做错了什么?

What am I doing wrong?

it.subscribe() 订阅 timertakeUntil 完全分开,没有任何效果。此外 takeUntil 需要来自其他来源的 onNextonComplete 信号,因此 dispose 也不会按预期工作。

您可以使用一个单独的主题和另一个 takeUntil 来停止流程:

var stop = PublishSubject.create<Object>()

observableThatHasToBeAliveAllTime
    .switchMap {
        observableThatEmitsItemOver5SecsWhenUpperObsEmits()
            .takeUntil(
                Observable.timer(5, TimeUnit.SECONDS)
            )
            .takeUntil(stop)
            .switchMap { /* some work */ }
    }
    .subscribe { /* handle result */ }

stop.onNext("Stop!");