RxJava Live Reactive Queue(带开关)
RxJava Live Reactive Queue (with on off switch)
正在使用 kotlin 开发 android 应用程序。
我需要设置一个系统,以便能够从实时队列中执行工作(并在流中观察工作结果)。
但我还必须能够根据一些外部因素(也以流的形式出现)切换 "queue processing",例如 networkIsAvailable (Observable<Boolean>)
。
我不能使用 Observable.fromIterable()
,因为它会立即创建可迭代对象,并且此队列将调整并且项目可能会被删除。
我需要某种循环,我可以在其中完成项目,检查以确保我们应该继续进行,然后弹出队列中的第一项并执行此操作。
我不确定如何在订阅中执行类似这样的循环?
队列也可能变空,当开关重新打开时,事情应该会重新开始。
也许我应该将决定(关于是否处理队列中的下一个项目)推送到 Subject<Boolean>?
中,然后订阅该主题以再次启动该过程?
示例:
打开 ---- 处理队列顶部,处理队列顶部(之前被轮询出队列) --- 关闭 -- 不再处理
重新开启——处理队列顶部,队列为空——停止
将项目添加到队列 -- 处理 -- 停止队列为空
打开处理 -- 将项目添加到队列 -- 在重新打开之前不处理
打开 -- 处理顶部项目
为此,您可以使用 RxJava2Extensions 中的 valve()
运算符。
public PublishProcessor<Boolean> flowControl = PublishProcessor.create();
public void start() {
Flowable./*...*/
.compose(FlowableTransformers.valve(flowControl))
.subscribe(/*...*/);
在这种情况下,flowControl.onNext(true)
将启动流,flowControl.onNext(false)
将停止并排队。
(任何其他 Rx2 类型必须转换为 Flowable 才能使用。)
我最近一直在处理这个问题(在管理 NIO 选择键时听取压力)并总结出一个使用常见运算符的相对简单的解决方案:
Queue<T> queue;
// This controls whether the queue is being polled or not.
// It's probably a good idea to make sure this emits the last value upon
// subscription using a BehaviorProcessor or a ReplayFlowable etc.
Flowable<Boolean> condition;
// This is a "dumb" generator which continuously polls the queue and emits items
Flowable<T> generator = Flowable.generate(emitter -> {
T item = queue.poll();
if (item != null) {
emitter.onNext(item);
}
});
// We wire our generator and condition together to achieve the expected result.
Flowable<T> drainWhileCondition = condition.distinctUntilChanged()
.concatMap(c -> c
? generator.takeUntil(condition.filter(Boolean.FALSE::equals))
: Flowable.empty());
你也可以为此制作一个转换器方法:
static <T> FlowableTransformer<T, T> valve(Flowable<Boolean> condition) {
return upstream -> condition.distinctUntilChanged()
.concatMap(c -> c
? upstream.takeUntil(condition.filter(Boolean.FALSE::equals))
: Flowable.empty());
}
然后像这样使用它:
Flowable<T> generator;
Flowable<Boolean> condition;
Flowable<T> valvedGenerator = generator.compose(valve(condition));
正在使用 kotlin 开发 android 应用程序。
我需要设置一个系统,以便能够从实时队列中执行工作(并在流中观察工作结果)。
但我还必须能够根据一些外部因素(也以流的形式出现)切换 "queue processing",例如 networkIsAvailable (Observable<Boolean>)
。
我不能使用 Observable.fromIterable()
,因为它会立即创建可迭代对象,并且此队列将调整并且项目可能会被删除。
我需要某种循环,我可以在其中完成项目,检查以确保我们应该继续进行,然后弹出队列中的第一项并执行此操作。
我不确定如何在订阅中执行类似这样的循环?
队列也可能变空,当开关重新打开时,事情应该会重新开始。
也许我应该将决定(关于是否处理队列中的下一个项目)推送到 Subject<Boolean>?
中,然后订阅该主题以再次启动该过程?
示例:
打开 ---- 处理队列顶部,处理队列顶部(之前被轮询出队列) --- 关闭 -- 不再处理
重新开启——处理队列顶部,队列为空——停止
将项目添加到队列 -- 处理 -- 停止队列为空
打开处理 -- 将项目添加到队列 -- 在重新打开之前不处理
打开 -- 处理顶部项目
为此,您可以使用 RxJava2Extensions 中的 valve()
运算符。
public PublishProcessor<Boolean> flowControl = PublishProcessor.create();
public void start() {
Flowable./*...*/
.compose(FlowableTransformers.valve(flowControl))
.subscribe(/*...*/);
在这种情况下,flowControl.onNext(true)
将启动流,flowControl.onNext(false)
将停止并排队。
(任何其他 Rx2 类型必须转换为 Flowable 才能使用。)
我最近一直在处理这个问题(在管理 NIO 选择键时听取压力)并总结出一个使用常见运算符的相对简单的解决方案:
Queue<T> queue;
// This controls whether the queue is being polled or not.
// It's probably a good idea to make sure this emits the last value upon
// subscription using a BehaviorProcessor or a ReplayFlowable etc.
Flowable<Boolean> condition;
// This is a "dumb" generator which continuously polls the queue and emits items
Flowable<T> generator = Flowable.generate(emitter -> {
T item = queue.poll();
if (item != null) {
emitter.onNext(item);
}
});
// We wire our generator and condition together to achieve the expected result.
Flowable<T> drainWhileCondition = condition.distinctUntilChanged()
.concatMap(c -> c
? generator.takeUntil(condition.filter(Boolean.FALSE::equals))
: Flowable.empty());
你也可以为此制作一个转换器方法:
static <T> FlowableTransformer<T, T> valve(Flowable<Boolean> condition) {
return upstream -> condition.distinctUntilChanged()
.concatMap(c -> c
? upstream.takeUntil(condition.filter(Boolean.FALSE::equals))
: Flowable.empty());
}
然后像这样使用它:
Flowable<T> generator;
Flowable<Boolean> condition;
Flowable<T> valvedGenerator = generator.compose(valve(condition));