使用 share():post 给新订阅者的最后一个值
Using share(): post last value to new subscribers
@Override
public void onPause() {
super.onPause();
compositeDisposable.clear();
}
@Override
public void onResume() {
super.onResume();
Flowable<MyData> distanceFlowable = myDataProcessor.hide().onBackpressureLatest()
.distinctUntilChanged()
.share();
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 1", data.value)});
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 2", data.value)});
}
当 distanceFlowable 不断获得新变化时,代码运行良好。然而,在 distanceFlowable 上只有一个 post 的情况下,只有 observer 1 会收到通知。
它的行为是这样的:
- post 在 distanceFlowable
- 观察者 1 订阅
- logcat 打印 "observer 1: 133"
- 观察者 2 订阅
我希望它表现得像:
- post 在 distanceFlowable
- 观察者 1 订阅
- logcat 打印 "observer 1: 133"
- 观察者 2 订阅
- logcat 打印 "observer 2: 133"
我尝试使用 ConnectedFlowable
代替 publish()
,然后 connect()
在 observer 1 和 之后使用 connect()
观察者 2 已订阅。但是即使 compositeDisposable
被清除并且没有人在听,它仍然 posts 是可流动的。
解决这个问题的首选方法是什么?
我错过了 flowable.connect()
returns 我可以添加到我的 compositeDisposable
的 Disposable
,这样它最终会在 onPause 中被清除。
只需使用 .compose(ReplayingShare.instance());
来自
https://github.com/JakeWharton/RxReplayingShare
应该可以。
@Override
public void onPause() {
super.onPause();
compositeDisposable.clear();
}
@Override
public void onResume() {
super.onResume();
Flowable<MyData> distanceFlowable = myDataProcessor.hide().onBackpressureLatest()
.distinctUntilChanged()
.share();
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 1", data.value)});
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 2", data.value)});
}
当 distanceFlowable 不断获得新变化时,代码运行良好。然而,在 distanceFlowable 上只有一个 post 的情况下,只有 observer 1 会收到通知。
它的行为是这样的:
- post 在 distanceFlowable
- 观察者 1 订阅
- logcat 打印 "observer 1: 133"
- 观察者 2 订阅
我希望它表现得像:
- post 在 distanceFlowable
- 观察者 1 订阅
- logcat 打印 "observer 1: 133"
- 观察者 2 订阅
- logcat 打印 "observer 2: 133"
我尝试使用 ConnectedFlowable
代替 publish()
,然后 connect()
在 observer 1 和 之后使用 connect()
观察者 2 已订阅。但是即使 compositeDisposable
被清除并且没有人在听,它仍然 posts 是可流动的。
解决这个问题的首选方法是什么?
我错过了 flowable.connect()
returns 我可以添加到我的 compositeDisposable
的 Disposable
,这样它最终会在 onPause 中被清除。
只需使用 .compose(ReplayingShare.instance());
来自
https://github.com/JakeWharton/RxReplayingShare
应该可以。