暂停和恢复基于 RxJava 中的布尔门的可观察对象 2.X?
Pause and resume an observable based on a boolean gate in RxJava 2.X?
假设我有一个处理器,当按下按钮时它会发出一个布尔值,将其视为一个切换。
boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));
我想做的是使用门的值来暂停和恢复可观察序列,在暂停时缓冲发出的值。
我已经阅读了很多,虽然在其他语言的响应式扩展中似乎可行,但 RxJava 似乎不支持它。
这是我想要实现的示例,它只是每秒输出一个增量值。当我按下按钮时,我希望输出停止,直到我再次按下它,这应该输出两次按钮按下之间发出的每个项目:
Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});
有谁知道实现类似目标的方法吗?
编辑 我写了一篇博客 post 关于如何实现这个 https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk
只需使用带有一个 Observable<T>
参数的现成 Observable.window
运算符。
RxJava2Extensions 库中现在有一个运算符 valve()
可以执行请求的行为。
您可以使用默认的 RxJava 运算符实现此目的:
final Random random = new Random();
final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
.map(it -> random.nextBoolean())
.startWith(false)
.doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
.take(100);
final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext(it -> System.out.println("New value " + it))
.take(100);
gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
innerData.window(
innerGates.distinctUntilChanged().filter(it -> it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
).flatMap(it -> it),
innerData.buffer(
innerGates.distinctUntilChanged().filter(it -> !it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> it))
)
.flatMap(Observable::from)
)))
.subscribe(it -> System.out.println("On next " + it));
假设我有一个处理器,当按下按钮时它会发出一个布尔值,将其视为一个切换。
boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));
我想做的是使用门的值来暂停和恢复可观察序列,在暂停时缓冲发出的值。
我已经阅读了很多,虽然在其他语言的响应式扩展中似乎可行,但 RxJava 似乎不支持它。
这是我想要实现的示例,它只是每秒输出一个增量值。当我按下按钮时,我希望输出停止,直到我再次按下它,这应该输出两次按钮按下之间发出的每个项目:
Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});
有谁知道实现类似目标的方法吗?
编辑 我写了一篇博客 post 关于如何实现这个 https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk
只需使用带有一个 Observable<T>
参数的现成 Observable.window
运算符。
RxJava2Extensions 库中现在有一个运算符 valve()
可以执行请求的行为。
您可以使用默认的 RxJava 运算符实现此目的:
final Random random = new Random();
final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
.map(it -> random.nextBoolean())
.startWith(false)
.doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
.take(100);
final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext(it -> System.out.println("New value " + it))
.take(100);
gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
innerData.window(
innerGates.distinctUntilChanged().filter(it -> it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
).flatMap(it -> it),
innerData.buffer(
innerGates.distinctUntilChanged().filter(it -> !it),
(ignore -> innerGates.distinctUntilChanged().filter(it -> it))
)
.flatMap(Observable::from)
)))
.subscribe(it -> System.out.println("On next " + it));