将多个 Mono<List<Item>> 合并为一个
combine multiple Mono<List<Item>> into one
我目前正在从事一个涉及一些反应式编程的项目。
我有 4 个不同的反应性存储库,从中我分别在 return 中得到 4 个不同的 Mono<List<SomeType>>
。
目标是 将它们组合成一个 Mono<List<GeneralType>>
,以便将其合并到 ResponseEntity.ok()
中对 return 的自定义响应中。我已经创建了一个 GeneralType
并且成功地转换了一个 Mono<List<SomeType>>
,但是没有取得进一步的进展。
所有存储库都有相似的签名:
public Mono<List<SomeType>> findAllByUserId(UUID userId)
我的回复中将所有不同列表合并为一个列表的字段:
private Mono<List<GeneralType>> items;
到目前为止我的方法是什么样的:
public Mono<List<GeneralType>> combineMonos(UUID userId) {
Mono<List<GeneralType>> combo1 = reactiveRepository.findAllByUserId(userId)
.map(list -> list.stream()
.map(GeneralType::new)
.collect(Collectors.toList()));
return combo1; // works just fine
}
所有其他列表都有几乎相同的方法,但是将它们放在一个单一的 Mono 中是一个问题。
我试过以下方法:
return Flux.merge(combo1.flatMapMany(Flux::fromIterable), combo2.flatMapMany(Flux::fromIterable)).collectList();
但是,IDE 敦促将 return 类型更改为 Flux<Object>
。
此外,某些列表可能为空,因此我不确定 zip()
是否是此处的一个选项。我读过,如果至少有一个结果为空,它将 return 一切为 empty。
所以问题是 如何在没有 block() 的情况下以有效的方式完成?
Merge 热切地连接到所有数据源。因此,当数据从任何来源发出时,它将被传递到下游管道。结果列表中的顺序基于项目发出的时间。
Zip 方法从源收集数据并将它们放入一个对象(元组——类似于一个盒子)并传递给下游。只要所有源发出数据,Zip 就会工作。任何来源 completes/throws 错误,它将停止。
我假设您的个人方法工作正常。您的问题与将结果合并到一个列表中有关。
private Mono<List<String>> getList1(){
return Mono.just(List.of("a", "b", "c"));
}
private Mono<List<String>> getList2(){
return Mono.just(Collections.emptyList());
}
private Mono<List<String>> getList3(){
return Mono.just(List.of("A", "B", "C"));
}
Flux.merge(getList1(), getList2(), getList3())
.flatMapIterable(Function.identity())
.collectList()
.subscribe(System.out::println); // [a, b, c, A, B, C]
参考:
http://www.vinsguru.com/reactive-programming-reactor-combining-multiple-sources-of-flux-mono/
A Mono::zip
将三个发布者异步组合在一起,我认为这是最好的解决方案。
否则这是一个非常简单的问题:
Mono<List<String>> m1 = Mono.just(Arrays.asList(new String[]{"A", "B", "C"}));
Mono<List<Character>> m2 = Mono.just(Arrays.asList(new Character[]{'a', 'b', 'c'}));
Mono<List<Integer>> m3 = Mono.just(Arrays.asList(new Integer[]{1, 2, 3}));
Mono.zip(m1, m2, m3)
.map(tuple3->{
List<Combined> c = new ArrayList<>();
int size = tuple3.getT1().size();
for ( int i=0; i < size; ++i ) {
c.add(new Combined(tuple3.getT1().get(i), tuple3.getT2().get(i), tuple3.getT3().get(i)));
}
return c;
})
.subscribe(System.out::println);
// [Combined(s=A, c=a, i=1), Combined(s=B, c=b, i=2), Combined(s=C, c=c, i=3)]
为了完整起见:
@Data
@AllArgsConstructor
class Combined {
String s;
Character c;
Integer i;
}
我目前正在从事一个涉及一些反应式编程的项目。
我有 4 个不同的反应性存储库,从中我分别在 return 中得到 4 个不同的 Mono<List<SomeType>>
。
目标是 将它们组合成一个 Mono<List<GeneralType>>
,以便将其合并到 ResponseEntity.ok()
中对 return 的自定义响应中。我已经创建了一个 GeneralType
并且成功地转换了一个 Mono<List<SomeType>>
,但是没有取得进一步的进展。
所有存储库都有相似的签名:
public Mono<List<SomeType>> findAllByUserId(UUID userId)
我的回复中将所有不同列表合并为一个列表的字段:
private Mono<List<GeneralType>> items;
到目前为止我的方法是什么样的:
public Mono<List<GeneralType>> combineMonos(UUID userId) {
Mono<List<GeneralType>> combo1 = reactiveRepository.findAllByUserId(userId)
.map(list -> list.stream()
.map(GeneralType::new)
.collect(Collectors.toList()));
return combo1; // works just fine
}
所有其他列表都有几乎相同的方法,但是将它们放在一个单一的 Mono 中是一个问题。
我试过以下方法:
return Flux.merge(combo1.flatMapMany(Flux::fromIterable), combo2.flatMapMany(Flux::fromIterable)).collectList();
但是,IDE 敦促将 return 类型更改为 Flux<Object>
。
此外,某些列表可能为空,因此我不确定 zip()
是否是此处的一个选项。我读过,如果至少有一个结果为空,它将 return 一切为 empty。
所以问题是 如何在没有 block() 的情况下以有效的方式完成?
Merge 热切地连接到所有数据源。因此,当数据从任何来源发出时,它将被传递到下游管道。结果列表中的顺序基于项目发出的时间。
Zip 方法从源收集数据并将它们放入一个对象(元组——类似于一个盒子)并传递给下游。只要所有源发出数据,Zip 就会工作。任何来源 completes/throws 错误,它将停止。
我假设您的个人方法工作正常。您的问题与将结果合并到一个列表中有关。
private Mono<List<String>> getList1(){
return Mono.just(List.of("a", "b", "c"));
}
private Mono<List<String>> getList2(){
return Mono.just(Collections.emptyList());
}
private Mono<List<String>> getList3(){
return Mono.just(List.of("A", "B", "C"));
}
Flux.merge(getList1(), getList2(), getList3())
.flatMapIterable(Function.identity())
.collectList()
.subscribe(System.out::println); // [a, b, c, A, B, C]
参考: http://www.vinsguru.com/reactive-programming-reactor-combining-multiple-sources-of-flux-mono/
A Mono::zip
将三个发布者异步组合在一起,我认为这是最好的解决方案。
否则这是一个非常简单的问题:
Mono<List<String>> m1 = Mono.just(Arrays.asList(new String[]{"A", "B", "C"}));
Mono<List<Character>> m2 = Mono.just(Arrays.asList(new Character[]{'a', 'b', 'c'}));
Mono<List<Integer>> m3 = Mono.just(Arrays.asList(new Integer[]{1, 2, 3}));
Mono.zip(m1, m2, m3)
.map(tuple3->{
List<Combined> c = new ArrayList<>();
int size = tuple3.getT1().size();
for ( int i=0; i < size; ++i ) {
c.add(new Combined(tuple3.getT1().get(i), tuple3.getT2().get(i), tuple3.getT3().get(i)));
}
return c;
})
.subscribe(System.out::println);
// [Combined(s=A, c=a, i=1), Combined(s=B, c=b, i=2), Combined(s=C, c=c, i=3)]
为了完整起见:
@Data
@AllArgsConstructor
class Combined {
String s;
Character c;
Integer i;
}