RxJava:如何在不破坏链的情况下有条件地将 Operators 应用于 Observable
RxJava: How to conditionally apply Operators to an Observable without breaking the chain
我在 RxJava observable 上有一个运算符链。我希望能够根据没有 "breaking the chain".
的布尔值应用两个运算符之一
我对 Rx(Java) 比较陌生,我觉得可能有比我目前引入临时变量的方法更惯用和可读的方法。
这是一个具体的例子,如果批处理大小字段为非空,则从可观察对象中缓冲项目,否则将发出单个无限制大小的批处理 toList()
:
Observable<Item> source = Observable.from(newItems);
Observable<List<Item>> batchedSource = batchSize == null ?
source.toList() :
source.buffer(batchSize);
return batchedSource.flatMap(...).map(...)
这样的事情可能吗? (伪 lambda,因为 Java):
Observable.from(newItems)
.applyIf(batchSize == null,
{ o.toList() },
{ o.buffer(batchSize) })
.flatMap(...).map(...)
您可以使用 compose(Func1)
保持顺序,但执行自定义行为
source
.compose(o -> condition ? o.map(v -> v + 1) : o.map(v -> v * v))
.filter(...)
.subscribe(...)
如果您希望发射单个值,您也可以使用 filter
运算符和 defaultIfEmpty
,或者如果您希望使用另一个 Observable 发射多个值,则可以使用 switchIfEmpty
。
val item = Observable.just("ABC")
item.filter { s -> s.startsWith("Z") }
.defaultIfEmpty("None")
.subscribe { println(it) }
我在 RxJava observable 上有一个运算符链。我希望能够根据没有 "breaking the chain".
的布尔值应用两个运算符之一我对 Rx(Java) 比较陌生,我觉得可能有比我目前引入临时变量的方法更惯用和可读的方法。
这是一个具体的例子,如果批处理大小字段为非空,则从可观察对象中缓冲项目,否则将发出单个无限制大小的批处理 toList()
:
Observable<Item> source = Observable.from(newItems);
Observable<List<Item>> batchedSource = batchSize == null ?
source.toList() :
source.buffer(batchSize);
return batchedSource.flatMap(...).map(...)
这样的事情可能吗? (伪 lambda,因为 Java):
Observable.from(newItems)
.applyIf(batchSize == null,
{ o.toList() },
{ o.buffer(batchSize) })
.flatMap(...).map(...)
您可以使用 compose(Func1)
保持顺序,但执行自定义行为
source
.compose(o -> condition ? o.map(v -> v + 1) : o.map(v -> v * v))
.filter(...)
.subscribe(...)
如果您希望发射单个值,您也可以使用 filter
运算符和 defaultIfEmpty
,或者如果您希望使用另一个 Observable 发射多个值,则可以使用 switchIfEmpty
。
val item = Observable.just("ABC")
item.filter { s -> s.startsWith("Z") }
.defaultIfEmpty("None")
.subscribe { println(it) }