使用 share():post 给新订阅者的最后一个值

Using share(): post last value to new subscribers

@Override
public void onPause() {
    super.onPause();
    compositeDisposable.clear();
}

@Override
public void onResume() {
    super.onResume();

    Flowable<MyData> distanceFlowable = myDataProcessor.hide().onBackpressureLatest()
            .distinctUntilChanged()
            .share();

    compositeDisposable.add(distanceFlowable)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(data -> {Log.w("observer 1", data.value)});

    compositeDisposable.add(distanceFlowable)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(data -> {Log.w("observer 2", data.value)});
}

当 distanceFlowable 不断获得新变化时,代码运行良好。然而,在 distanceFlowable 上只有一个 post 的情况下,只有 observer 1 会收到通知。

它的行为是这样的:

  1. post 在 distanceFlowable
  2. 观察者 1 订阅
  3. logcat 打印 "observer 1: 133"
  4. 观察者 2 订阅

我希望它表现得像:

  1. post 在 distanceFlowable
  2. 观察者 1 订阅
  3. logcat 打印 "observer 1: 133"
  4. 观察者 2 订阅
  5. logcat 打印 "observer 2: 133"

我尝试使用 ConnectedFlowable 代替 publish(),然后 connect()observer 1 之后使用 connect()观察者 2 已订阅。但是即使 compositeDisposable 被清除并且没有人在听,它仍然 posts 是可流动的。

解决这个问题的首选方法是什么?

我错过了 flowable.connect() returns 我可以添加到我的 compositeDisposableDisposable,这样它最终会在 onPause 中被清除。

只需使用 .compose(ReplayingShare.instance()); 来自 https://github.com/JakeWharton/RxReplayingShare

应该可以。