RxJava - 如何停止(和恢复)Hot Observable(间隔)?
RxJava - How to stop (and resume) a Hot Observable (interval)?
我有以下 Hot Observable:
hotObservable = Observable.interval(0L, 1L, TimeUnit.SECONDS)
.map((t) -> getCurrentTimeInMillis()))
但是,我找不到阻止它的好方法。我能够使用 takeWhile
和 boolean
标志 (runTimer
) 部分解决此问题:
Observable.interval(0L, 1L, TimeUnit.SECONDS)
.takeWhile((t) -> runTimer)
.map((t) -> getCurrentTimeInMillis()))
虽然有两点我不喜欢这种方法:
- 我必须保留标志
runTimer
,我不想要。
- 一旦
runTimer
变为 false
,Observable 就完成了,这意味着如果我想再次发射,我需要创建一个新的 Observable。我不想要那个。我只想让 Observable 停止发射物品,直到我告诉它重新开始。
我希望得到这样的东西:
hotObservable.stop();
hotObservable.resume();
这样我就不需要保留任何标志,并且 observable 始终处于活动状态(尽管它可能不会发出事件)。
我怎样才能做到这一点?
一种可能的方法是使用 BehaviorSubject 和 switchMap:
BehaviorSubject<Boolean> subject = BehaviorSubject.create(true);
hotObservable = subject.distinctUntilChanged().switchMap((on) -> {
if (on) {
return Observable.interval(0L, 1L, TimeUnit.SECONDS);
} else {
return Observable.never();
}
}).map((t) -> getCurrentTimeInMillis());
通过向主题发送布尔值,可以控制可观察对象的输出。 subject.onNext(true)
将导致使用该主题创建的任何可观察对象开始发出值。 subject.onNext(false)
禁用该流。
switchMap
负责在关闭时处理底层的可观察对象。它还使用 distinctUntilChanged
来确保它不会进行不必要的切换。
也许你可以用这个
Observable.interval(3, TimeUnit.SECONDS)
.takeUntil(stop)
.subscribe(new MyObserver());
Thread.sleep(10000);
stop.onNext(-1);
我有以下 Hot Observable:
hotObservable = Observable.interval(0L, 1L, TimeUnit.SECONDS)
.map((t) -> getCurrentTimeInMillis()))
但是,我找不到阻止它的好方法。我能够使用 takeWhile
和 boolean
标志 (runTimer
) 部分解决此问题:
Observable.interval(0L, 1L, TimeUnit.SECONDS)
.takeWhile((t) -> runTimer)
.map((t) -> getCurrentTimeInMillis()))
虽然有两点我不喜欢这种方法:
- 我必须保留标志
runTimer
,我不想要。 - 一旦
runTimer
变为false
,Observable 就完成了,这意味着如果我想再次发射,我需要创建一个新的 Observable。我不想要那个。我只想让 Observable 停止发射物品,直到我告诉它重新开始。
我希望得到这样的东西:
hotObservable.stop();
hotObservable.resume();
这样我就不需要保留任何标志,并且 observable 始终处于活动状态(尽管它可能不会发出事件)。
我怎样才能做到这一点?
一种可能的方法是使用 BehaviorSubject 和 switchMap:
BehaviorSubject<Boolean> subject = BehaviorSubject.create(true);
hotObservable = subject.distinctUntilChanged().switchMap((on) -> {
if (on) {
return Observable.interval(0L, 1L, TimeUnit.SECONDS);
} else {
return Observable.never();
}
}).map((t) -> getCurrentTimeInMillis());
通过向主题发送布尔值,可以控制可观察对象的输出。 subject.onNext(true)
将导致使用该主题创建的任何可观察对象开始发出值。 subject.onNext(false)
禁用该流。
switchMap
负责在关闭时处理底层的可观察对象。它还使用 distinctUntilChanged
来确保它不会进行不必要的切换。
也许你可以用这个
Observable.interval(3, TimeUnit.SECONDS)
.takeUntil(stop)
.subscribe(new MyObserver());
Thread.sleep(10000);
stop.onNext(-1);