DisposableObserver dispose() 不工作(仍然收到更新)

DisposableObserver dispose() not working (updates still received)

即使在 MyFragment.javaonPause() 中调用 dispose() 之后,我的回调(请参阅下面的 onResume())仍然被调用。为什么?

我认为这不重要,但是:我从多个片段调用 NetworkUtils.subscribeToAvgPriceUpdates()(一次只有一个可见)。在每个片段中,我都有一个 DisposableObserver,然后当我切换到该片段时,我使用该观察者订阅数据更新。

NetworkUtils.java:

public static void subscribeToAvgPriceUpdates(DisposableObserver<List<Result>> observer, CoinPriceUpdateCallback callback) {
    if(observer != null && !observer.isDisposed())
        observer.dispose();

    observer = new DisposableObserver<List<Result>>() {
        @Override
        public void onNext(List<Result> results) {
            callback.onUpdated(results);
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() { }
    };

    MainApplication.apiProvider.getAPI().getAllTickersRx()
            .repeatWhen(objectObservable -> objectObservable.delay(15000, TimeUnit.MILLISECONDS) )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
}

MyFragment.java:

private DisposableObserver<List<Result>> tickerUpdateObserver;

@Override
public void onPause () {
    if(tickerUpdateObserver != null)
        tickerUpdateObserver.dispose();

    super.onPause();
}

@Override
public void onResume() {
    super.onResume();

    NetworkUtils.subscribeToAvgPriceUpdates(tickerUpdateObserver, results -> {
        // still getting called even after I switch to another fragment, why?
        // shouldn't .dispose() in onPause() stop the updates?
    });
}

问题是您在方法中创建了一个新的 DisposableObserver,但字段 tickerUpdateObserver 保持相同的第一个实例,因此您没有对较新观察者的引用。

您可以 return 方法中的新 DisposableObserver 并更新字段:

public static DisposableObserver<List<Result>> subscribeToAvgPriceUpdates(
        DisposableObserver<List<Result>> observer, 
        CoinPriceUpdateCallback callback) {
    if(observer != null && !observer.isDisposed())
        observer.dispose();

    observer = new DisposableObserver<List<Result>>() {
        @Override
        public void onNext(List<Result> results) {
            callback.onUpdated(results);
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() { }
    };

    MainApplication.apiProvider.getAPI().getAllTickersRx()
        .repeatWhen(objectObservable -> 
            objectObservable.delay(15000, TimeUnit.MILLISECONDS) )
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);

    return observer;
}

@Override
public void onResume() {
    super.onResume();

    tickerUpdateObserver = NetworkUtils.subscribeToAvgPriceUpdates(
            tickerUpdateObserver, results -> {
        // still getting called even after I switch to another fragment, why?
        // shouldn't .dispose() in onPause() stop the updates?
    });
}