如何在 RxJava 2 中使用 onResume() 和 onPause() 方法订阅和取消订阅?
How can I subscribe and unsubscribe using onResume() and onPause() methods with RxJava 2?
好吧,我需要将发射绑定到我的 activity 的生命周期。我怎样才能做到这一点?我应该什么时候创建观察者和可观察实例?
如果您有一个有时需要数据而有时不需要数据的可观察对象,有一种使用 switchMap()
运算符订阅和取消订阅的简单方法。
假设您有一个需要数据的可观察对象:
Observable<LocationData> locationDataObservable;
那么,如果你引入一个切换可观察对象:
PublishSubject<Boolean> switchObservable = PublishSubject.create();
您可以控制对第一个可观察对象的订阅:
Observable<LocationData> switchedLocationDataObservable =
switchObservable
.switchMap( abled -> abled ? locationDataObservable : Observable.never() )
.subscribe();
要启用接收数据,请执行
switchObservable.onNext( Boolean.TRUE );
并禁用,
switchObservable.onNext( Boolean.FALSE );
switchMap()
接线员负责为您订阅和取消订阅。
不用Observable.never()即可解决。只需将 BehaviourSubject 与 takeUntil 和 repeatWhen 运算符一起使用。
private fun getPauseUpdatesSource(): Flowable<Unit> {
return setUpdatesPausedSubject
.filter { it }
.map { }
.toFlowableLatest()
}
private fun getResumeUpdatesSource(): Flowable<Unit> {
return setUpdatesPausedSubject
.filter { it.not() }
.map { }
.toFlowableLatest()
}
在 rx 链中:
locationDataObservable
.takeUntil(getPauseUpdatesSource())
.repeatWhen { it.switchMap { getResumeUpdatesSource() } }
它只会暂停你的链,不会发出任何空的 observables
好吧,我需要将发射绑定到我的 activity 的生命周期。我怎样才能做到这一点?我应该什么时候创建观察者和可观察实例?
如果您有一个有时需要数据而有时不需要数据的可观察对象,有一种使用 switchMap()
运算符订阅和取消订阅的简单方法。
假设您有一个需要数据的可观察对象:
Observable<LocationData> locationDataObservable;
那么,如果你引入一个切换可观察对象:
PublishSubject<Boolean> switchObservable = PublishSubject.create();
您可以控制对第一个可观察对象的订阅:
Observable<LocationData> switchedLocationDataObservable =
switchObservable
.switchMap( abled -> abled ? locationDataObservable : Observable.never() )
.subscribe();
要启用接收数据,请执行
switchObservable.onNext( Boolean.TRUE );
并禁用,
switchObservable.onNext( Boolean.FALSE );
switchMap()
接线员负责为您订阅和取消订阅。
不用Observable.never()即可解决。只需将 BehaviourSubject 与 takeUntil 和 repeatWhen 运算符一起使用。
private fun getPauseUpdatesSource(): Flowable<Unit> {
return setUpdatesPausedSubject
.filter { it }
.map { }
.toFlowableLatest()
}
private fun getResumeUpdatesSource(): Flowable<Unit> {
return setUpdatesPausedSubject
.filter { it.not() }
.map { }
.toFlowableLatest()
}
在 rx 链中:
locationDataObservable
.takeUntil(getPauseUpdatesSource())
.repeatWhen { it.switchMap { getResumeUpdatesSource() } }
它只会暂停你的链,不会发出任何空的 observables