rxjava定时器在订阅成功执行后抛出超时异常

rxjava timer throws timeout exception after subscribe has successfully executed

我有以下代码:

repo.getObservable()
            .timeout(1, TimeUnit.MINUTES)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe {
                _isInProgress.value = true
            }
            .doFinally {
                _isInProgress.value = false
            }
            .subscribe(
                    {
                        Timber.d("Success")
                    },
                    {

                        Timber.e(it)
                    })
            .trackDisposable()

问题是我在几秒钟后成功收到成功消息,但我的预加载器仍然等待 1 分钟,然后我的订阅错误部分被执行。这是预期的行为吗?如果订阅的成功部分被执行,我能做些什么来停止超时?

P. S. 从 getObservable() 返回的 Observable 是这样创建的:PublishSubject.create()

那一定是因为您没有在 PublishSubject 上调用 onComplete,而只调用了 onNext。 看这个例子:

PublishSubject<Integer> source = PublishSubject.create();

// It will get 1, 2, 3, 4 and onComplete
source.subscribe(getFirstObserver()); 

source.onNext(1);
source.onNext(2);
source.onNext(3);

// It will get 4 and onComplete for second observer also.
source.subscribe(getSecondObserver());

source.onNext(4);
source.onComplete();

在调用 onComplete 之前,观察者正在等待更多结果。 您可以在接收到您等待的结果后 unsubscribe/dispose 或在发送所有结果后在 Observable 上调用 onComplete

如果您需要一个结果,请在 timeout 之前或之后使用 take(1):

repo.getObservable()
        .take(1) // <---------------------------------------------
        .timeout(1, TimeUnit.MINUTES)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe {
            _isInProgress.value = true
        }
        .doFinally {
            _isInProgress.value = false
        }
        .subscribe(
                {
                    Timber.d("Success")
                },
                {

                    Timber.e(it)
                })
        .trackDisposable()