RxJava:如何在不破坏链的情况下有条件地将 Operators 应用于 Observable

RxJava: How to conditionally apply Operators to an Observable without breaking the chain

我在 RxJava observable 上有一个运算符链。我希望能够根据没有 "breaking the chain".

的布尔值应用两个运算符之一

我对 Rx(Java) 比较陌生,我觉得可能有比我目前引入临时变量的方法更惯用和可读的方法。

这是一个具体的例子,如果批处理大小字段为非空,则从可观察对象中缓冲项目,否则将发出单个无限制大小的批处理 toList():

Observable<Item> source = Observable.from(newItems);
Observable<List<Item>> batchedSource = batchSize == null ?
                source.toList() :
                source.buffer(batchSize);
return batchedSource.flatMap(...).map(...)

这样的事情可能吗? (伪 lambda,因为 Java):

Observable.from(newItems)
    .applyIf(batchSize == null,
             { o.toList() },
             { o.buffer(batchSize) })
    .flatMap(...).map(...)

您可以使用 compose(Func1) 保持顺序,但执行自定义行为

source
.compose(o -> condition ? o.map(v -> v + 1) : o.map(v -> v * v))
.filter(...)
.subscribe(...)

如果您希望发射单个值,您也可以使用 filter 运算符和 defaultIfEmpty,或者如果您希望使用另一个 Observable 发射多个值,则可以使用 switchIfEmpty

val item = Observable.just("ABC")

item.filter { s -> s.startsWith("Z") }
    .defaultIfEmpty("None")
    .subscribe { println(it) }