订阅的结果没有被使用

The result of subscribe is not used

我今天已经升级到 Android Studio 3.1,它似乎增加了一些 lint 检查。这些 lint 检查之一是针对未存储在变量中的一次性 RxJava2 subscribe() 调用。例如,从我的房间数据库中获取所有玩家的列表:

Single.just(db)
            .subscribeOn(Schedulers.io())
            .subscribe(db -> db.playerDao().getAll());

产生一个大的黄色块和这个工具提示:

The result of subscribe is not used

像这样的一次性 Rx 调用的最佳做法是什么?我应该保留 Disposabledispose() 完成吗?还是我应该 @SuppressLint 继续前进?

这似乎只影响 RxJava2 (io.reactivex),RxJava (rx) 没有这个 lint。

IDE 不知道您的订阅在未处理时可能产生什么潜在影响,因此将其视为潜在不安全。例如,您的 Single 可能包含网络调用,如果您的 Activity 在执行期间被放弃,则可能会导致内存泄漏。

管理大量 Disposable 的一种便捷方法是使用 CompositeDisposable;只需在封闭的 class 中创建一个新的 CompositeDisposable 实例变量,然后将所有 Disposables 添加到 CompositeDisposable(使用 RxKotlin,您只需将 addTo(compositeDisposable) 附加到所有 Disposables)。最后,当您完成实例后,请调用 compositeDisposable.dispose()

这将消除 lint 警告,并确保您的 Disposables 得到妥善管理。

在这种情况下,代码如下所示:

CompositeDisposable compositeDisposable = new CompositeDisposable();

Disposable disposable = Single.just(db)
        .subscribeOn(Schedulers.io())
        .subscribe(db -> db.get(1)));

compositeDisposable.add(disposable); //IDE is satisfied that the Disposable is being managed. 
disposable.addTo(compositeDisposable); //Alternatively, use this RxKotlin extension function.


compositeDisposable.dispose(); //Placed wherever we'd like to dispose our Disposables (i.e. in onDestroy()).

Activity 被销毁的那一刻,一次性物品列表被清除,我们很好。

io.reactivex.disposables.CompositeDisposable mDisposable;

    mDisposable = new CompositeDisposable();

    mDisposable.add(
            Single.just(db)
                    .subscribeOn(Schedulers.io())
                    .subscribe(db -> db.get(1)));

    mDisposable.dispose(); // dispose wherever is required

如果您确定一次性处理正确,例如使用 doOnSubscribe() 运算符,您可以将此添加到 Gradle:

android {
lintOptions {
     disable 'CheckResult'
}}

正如所建议的那样,您可以使用一些全局 CompositeDisposable 将订阅操作的结果添加到那里。

RxJava2Extensions library contains useful methods to automatically remove created disposable from the CompositeDisposable when it completes. See subscribeAutoDispose 部分。

在你的情况下它可能看起来像这样

SingleConsumers.subscribeAutoDispose(
    Single.just(db)
            .subscribeOn(Schedulers.io()),
    composite,
    db -> db.playerDao().getAll())

您可以订阅 DisposableSingleObserver:

Single.just(db)
    .subscribeOn(Schedulers.io())
    .subscribe(new DisposableSingleObserver<Object>() {
            @Override
            public void onSuccess(Object obj) {
                // work with the resulting todos...
                dispose();
            }

            @Override
            public void onError(Throwable e) {
                // handle the error case...
                dispose();
            }});

如果您需要直接处理 Single 对象(例如在它发出之前),您可以实现方法 onSubscribe(Disposable d) 来获取和使用 Disposable 引用。

您也可以自己实现SingleObserver接口或者使用其他子类。

您可以使用优步 AutoDispose and rxjava .as

        Single.just(db)
            .subscribeOn(Schedulers.io())
            .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this)))
            .subscribe(db -> db.playerDao().getAll());

确保您了解何时根据 ScopeProvider 退订。

我发现自己一次又一次地回到如何正确处理订阅的问题上,尤其是这篇文章。一些博客和演讲声称未能调用 dispose 必然会导致内存泄漏,我认为这种说法过于笼统。据我了解,关于不存储 subscribe 结果的 lint 警告在某些情况下不是问题,因为:

  • 并非所有可观察对象 运行 在 Android activity
  • 的上下文中
  • observable 可以是同步的
  • 如果可观察完成,则隐式调用 Dispose

因为我不想抑制 lint 警告,所以我最近开始将以下模式用于具有同步可观察对象的情况:

var disposable: Disposable? = null

disposable = Observable
   .just(/* Whatever */)
   .anyOperator()
   .anyOtherOperator()
   .subscribe(
      { /* onSuccess */ },
      { /* onError */ },
      {
         // onComplete
         // Make lint happy. It's already disposed because the stream completed.
         disposable?.dispose()
      }
   )

我对对此的任何评论都感兴趣,无论是确认正确性还是发现漏洞。

还有另一种方法可用,即避免手动使用 Disposables(添加和删除订阅)。

您可以定义一个 Observable 并且该 observable 将从 SubjectBehaviour 接收内容(如果您使用 RxJava)。通过将该可观察对象传递给您的 LiveData,这应该会起作用。查看基于初始问题的下一个示例:

private val playerSubject: Subject<Player> = BehaviorSubject.create()

private fun getPlayer(idPlayer: String) {
        playerSubject.onNext(idPlayer)
}

private val playerSuccessful: Observable<DataResult<Player>> = playerSubject
                        .flatMap { playerId ->
                            playerRepository.getPlayer(playerId).toObservable()
                        }
                        .share()

val playerFound: LiveData<Player>
    get() = playerSuccessful
        .filterAndMapDataSuccess()
        .toLiveData()

val playerNotFound: LiveData<Unit>
    get() = playerSuccessful.filterAndMapDataFailure()
        .map { Unit }
        .toLiveData()

// These are a couple of helpful extensions

fun <T> Observable<DataResult<T>>.filterAndMapDataSuccess(): Observable<T> =
filter { it is DataResult.Success }.map { (it as DataResult.Success).data }

fun <T> Observable<DataResult<T>>.filterAndMapDataFailure(): Observable<DataResult.Failure<T>> =
filter { it is DataResult.Failure }.map { it as DataResult.Failure<T> }