如何在 RxJava 2 中使用 onResume() 和 onPause() 方法订阅和取消订阅?

How can I subscribe and unsubscribe using onResume() and onPause() methods with RxJava 2?

好吧,我需要将发射绑定到我的 activity 的生命周期。我怎样才能做到这一点?我应该什么时候创建观察者和可观察实例?

如果您有一个有时需要数据而有时不需要数据的可观察对象,有一种使用 switchMap() 运算符订阅和取消订阅的简单方法。

假设您有一个需要数据的可观察对象:

Observable<LocationData> locationDataObservable;

那么,如果你引入一个切换可观察对象:

PublishSubject<Boolean> switchObservable = PublishSubject.create();

您可以控制对第一个可观察对象的订阅:

Observable<LocationData> switchedLocationDataObservable =
  switchObservable
    .switchMap( abled -> abled ? locationDataObservable : Observable.never() )
    .subscribe();

要启用接收数据,请执行

switchObservable.onNext( Boolean.TRUE );

并禁用,

switchObservable.onNext( Boolean.FALSE );

switchMap() 接线员负责为您订阅和取消订阅。

不用Observable.never()即可解决。只需将 BehaviourSubject 与 takeUntil 和 repeatWhen 运算符一起使用。

private fun getPauseUpdatesSource(): Flowable<Unit> {
    return setUpdatesPausedSubject
        .filter { it }
        .map { }
        .toFlowableLatest()
}

private fun getResumeUpdatesSource(): Flowable<Unit> {
    return setUpdatesPausedSubject
        .filter { it.not() }
        .map { }
        .toFlowableLatest()
}

在 rx 链中:

locationDataObservable
                .takeUntil(getPauseUpdatesSource())
                .repeatWhen { it.switchMap { getResumeUpdatesSource() } }

它只会暂停你的链,不会发出任何空的 observables