用于组合地图的 RxJava 组合器?
RxJava combinator for combining maps?
我有 Map
个这样的任务:
Map<Identifier, Observable<Progress>> tasks;
我想将其转换为:
Observable<Map<Identifier, Progress>>
其中每个 Map<Identifier, Progress> element
包含每个任务的最新元素。
我希望新的 Observable
仅在 all 任务完成时完成,而在 any 其中之一时失败任务失败。
有这个的组合器吗?
要将单个 Map<Identifier, Observable<Progress>> task
转换为 <Identifier, Progress>
,请使用:
Observable.just(Identifier)
.flatMap(Observable<Progress> progressObservable,
(Identifier, Progress, Pair<Identifier, Progress>) -> {create pair of elements})
要将这些任务组合在一起,请使用 combineLatest 运算符
Observable.combineLatest(List<Observable<Pair<Identifier, Progress>>>,
(List<Pair<Identifier, Progress>>) -> {merge into `Map`})
希望我能很好地理解你的想法。当然是用伪代码写的。
一个实现:
public static <T, S> Observable<Map<T, S>> zipMaps(final Map<T, Observable<S>> tasks) {
Objects.requireNonNull(tasks, "tasks is null");
return Observable.combineLatest(
tasks.entrySet()
.stream()
.map(entry -> Observable.combineLatest(
Observable.just(entry.getKey()),
entry.getValue(),
Pair::with))
.collect(ImmutableList.toImmutableList()),
xs -> Arrays.stream(xs)
.map(x -> (Pair<T, S>)x)
.collect(ImmutableMap.toImmutableMap(Pair::getValue0, Pair::getValue1)));
}
我有 Map
个这样的任务:
Map<Identifier, Observable<Progress>> tasks;
我想将其转换为:
Observable<Map<Identifier, Progress>>
其中每个 Map<Identifier, Progress> element
包含每个任务的最新元素。
我希望新的 Observable
仅在 all 任务完成时完成,而在 any 其中之一时失败任务失败。
有这个的组合器吗?
要将单个 Map<Identifier, Observable<Progress>> task
转换为 <Identifier, Progress>
,请使用:
Observable.just(Identifier)
.flatMap(Observable<Progress> progressObservable,
(Identifier, Progress, Pair<Identifier, Progress>) -> {create pair of elements})
要将这些任务组合在一起,请使用 combineLatest 运算符
Observable.combineLatest(List<Observable<Pair<Identifier, Progress>>>,
(List<Pair<Identifier, Progress>>) -> {merge into `Map`})
希望我能很好地理解你的想法。当然是用伪代码写的。
一个实现:
public static <T, S> Observable<Map<T, S>> zipMaps(final Map<T, Observable<S>> tasks) {
Objects.requireNonNull(tasks, "tasks is null");
return Observable.combineLatest(
tasks.entrySet()
.stream()
.map(entry -> Observable.combineLatest(
Observable.just(entry.getKey()),
entry.getValue(),
Pair::with))
.collect(ImmutableList.toImmutableList()),
xs -> Arrays.stream(xs)
.map(x -> (Pair<T, S>)x)
.collect(ImmutableMap.toImmutableMap(Pair::getValue0, Pair::getValue1)));
}