是否有可能在 Rx java kotlin 中删除超时或在成功时中止观察

Is it possible in Rx java kotlin to remove timeout or abort observable on success

我有以下代码,当 5 分钟后成功收到 onNext 时,我收到 onError,因为有超时,所以我如何禁用超时或中止一次性不在 [=13] 之后调用 doSomethingElseUnSuccesfull() =]被称为

getObservable().timeout(5, TimeUnit.MINUTES)
            .onErrorReturnItem(Optional.of(false))
            .distinctUntilChanged()
            .subscribe {
                if (it.get()) {
                    doSomethingSuccesfull()
                    //Remove timer or abort disposable
                } else {
                     doSomethingElseUnSuccesfull()
                   }
            }.also { disposableList.add(it) }

disposableList = CompositeDisposable()有几种不同的一次性

我不知道你想做什么,但如果你只需要做一个请求,那么你应该使用 Single 而不是 Observable

如果不是这种情况,您可以尝试使用 .doOnNext{} 并将您的一次性实例也放在一个变量中,以便在您需要时处理它,类似这样的事情:

var myDisposable: Disposable? = null
var shouldDispose = false

getObservable().timeout(5, TimeUnit.MINUTES)
            .onErrorReturnItem(Optional.of(false))
            .distinctUntilChanged()
            .doOnNext {
                if (shouldDispose) {
                    myDisposable?.dispose()
                    myDisposable = null
                    shouldDispose = false
                }
            }
            .subscribe {
                if (it.get()) {
                    doSomethingSuccesfull()
                    shouldDispose = true
                } else {
                     doSomethingElseUnSuccesfull()
                   }
            }.also {
              disposableList.add(it)
              myDisposable = it
            }