RxJava - 如何停止(和恢复)Hot Observable(间隔)?

RxJava - How to stop (and resume) a Hot Observable (interval)?

我有以下 Hot Observable:

hotObservable = Observable.interval(0L, 1L, TimeUnit.SECONDS)
                          .map((t) -> getCurrentTimeInMillis()))

但是,我找不到阻止它的好方法。我能够使用 takeWhileboolean 标志 (runTimer) 部分解决此问题:

Observable.interval(0L, 1L, TimeUnit.SECONDS)
          .takeWhile((t) -> runTimer)
          .map((t) -> getCurrentTimeInMillis()))

虽然有两点我不喜欢这种方法:

  1. 我必须保留标志 runTimer,我不想要。
  2. 一旦 runTimer 变为 false,Observable 就完成了,这意味着如果我想再次发射,我需要创建一个新的 Observable。我不想要那个。我只想让 Observable 停止发射物品,直到我告诉它重新开始。

我希望得到这样的东西:

hotObservable.stop();
hotObservable.resume();

这样我就不需要保留任何标志,并且 observable 始终处于活动状态(尽管它可能不会发出事件)。

我怎样才能做到这一点?

一种可能的方法是使用 BehaviorSubject 和 switchMap:

BehaviorSubject<Boolean> subject = BehaviorSubject.create(true);
hotObservable = subject.distinctUntilChanged().switchMap((on) -> {
    if (on) {
        return Observable.interval(0L, 1L, TimeUnit.SECONDS);
    } else {
        return Observable.never();
    }
}).map((t) -> getCurrentTimeInMillis());

通过向主题发送布尔值,可以控制可观察对象的输出。 subject.onNext(true) 将导致使用该主题创建的任何可观察对象开始发出值。 subject.onNext(false) 禁用该流。

switchMap 负责在关闭时处理底层的可观察对象。它还使用 distinctUntilChanged 来确保它不会进行不必要的切换。

也许你可以用这个

Observable.interval(3, TimeUnit.SECONDS)
        .takeUntil(stop)
        .subscribe(new MyObserver());

Thread.sleep(10000);
stop.onNext(-1);