RxJava Live Reactive Queue(带开关)

RxJava Live Reactive Queue (with on off switch)

正在使用 kotlin 开发 android 应用程序。

我需要设置一个系统,以便能够从实时队列中执行工作(并在流中观察工作结果)。

但我还必须能够根据一些外部因素(也以流的形式出现)切换 "queue processing",例如 networkIsAvailable (Observable<Boolean>)

我不能使用 Observable.fromIterable(),因为它会立即创建可迭代对象,并且此队列将调整并且项目可能会被删除。

我需要某种循环,我可以在其中完成项目,检查以确保我们应该继续进行,然后弹出队列中的第一项并执行此操作。

我不确定如何在订阅中执行类似这样的循环?

队列也可能变空,当开关重新打开时,事情应该会重新开始。

也许我应该将决定(关于是否处理队列中的下一个项目)推送到 Subject<Boolean>? 中,然后订阅该主题以再次启动该过程?

示例:

打开 ---- 处理队列顶部,处理队列顶部(之前被轮询出队列) --- 关闭 -- 不再处理

重新开启——处理队列顶部,队列为空——停止

将项目添加到队列 -- 处理 -- 停止队列为空

打开处理 -- 将项目添加到队列 -- 在重新打开之前不处理

打开 -- 处理顶部项目

为此,您可以使用 RxJava2Extensions 中的 valve() 运算符。

public PublishProcessor<Boolean> flowControl = PublishProcessor.create();

public void start() {
    Flowable./*...*/
            .compose(FlowableTransformers.valve(flowControl))
            .subscribe(/*...*/);

在这种情况下,flowControl.onNext(true) 将启动流,flowControl.onNext(false) 将停止并排队。

(任何其他 Rx2 类型必须转换为 Flowable 才能使用。)

我最近一直在处理这个问题(在管理 NIO 选择键时听取压力)并总结出一个使用常见运算符的相对简单的解决方案:

Queue<T> queue;

// This controls whether the queue is being polled or not.
// It's probably a good idea to make sure this emits the last value upon
// subscription using a BehaviorProcessor or a ReplayFlowable etc.
Flowable<Boolean> condition;

// This is a "dumb" generator which continuously polls the queue and emits items
Flowable<T> generator = Flowable.generate(emitter -> {
    T item = queue.poll();
    if (item != null) {
        emitter.onNext(item);
    }
});

// We wire our generator and condition together to achieve the expected result.
Flowable<T> drainWhileCondition = condition.distinctUntilChanged()
    .concatMap(c -> c
        ? generator.takeUntil(condition.filter(Boolean.FALSE::equals))
        : Flowable.empty());

你也可以为此制作一个转换器方法:

static <T> FlowableTransformer<T, T> valve(Flowable<Boolean> condition) {
    return upstream -> condition.distinctUntilChanged()
            .concatMap(c -> c
                    ? upstream.takeUntil(condition.filter(Boolean.FALSE::equals))
                    : Flowable.empty());
}

然后像这样使用它:

Flowable<T> generator;
Flowable<Boolean> condition;

Flowable<T> valvedGenerator = generator.compose(valve(condition));