将多个 Mono<List<Item>> 合并为一个

combine multiple Mono<List<Item>> into one

我目前正在从事一个涉及一些反应式编程的项目。

我有 4 个不同的反应性存储库,从中我分别在 return 中得到 4 个不同的 Mono<List<SomeType>>。 目标是 将它们组合成一个 Mono<List<GeneralType>>,以便将其合并到 ResponseEntity.ok() 中对 return 的自定义响应中。我已经创建了一个 GeneralType 并且成功地转换了一个 Mono<List<SomeType>>,但是没有取得进一步的进展。

所有存储库都有相似的签名:

public Mono<List<SomeType>> findAllByUserId(UUID userId)

我的回复中将所有不同列表合并为一个列表的字段:

private Mono<List<GeneralType>> items;

到目前为止我的方法是什么样的:

public Mono<List<GeneralType>> combineMonos(UUID userId) {
    Mono<List<GeneralType>> combo1 = reactiveRepository.findAllByUserId(userId)
        .map(list -> list.stream()
            .map(GeneralType::new)
            .collect(Collectors.toList()));
    return combo1; // works just fine
}

所有其他列表都有几乎相同的方法,但是将它们放在一个单一的 Mono 中是一个问题。

我试过以下方法:

return Flux.merge(combo1.flatMapMany(Flux::fromIterable), combo2.flatMapMany(Flux::fromIterable)).collectList();

但是,IDE 敦促将 return 类型更改为 Flux<Object>。 此外,某些列表可能为空,因此我不确定 zip() 是否是此处的一个选项。我读过,如果至少有一个结果为空,它将 return 一切为 empty

所以问题是 如何在没有 block() 的情况下以有效的方式完成?

Merge 热切地连接到所有数据源。因此,当数据从任何来源发出时,它将被传递到下游管道。结果列表中的顺序基于项目发出的时间。

Zip 方法从源收集数据并将它们放入一个对象(元组——类似于一个盒子)并传递给下游。只要所有源发出数据,Zip 就会工作。任何来源 completes/throws 错误,它将停止。


我假设您的个人方法工作正常。您的问题与将结果合并到一个列表中有关。

private Mono<List<String>> getList1(){
    return Mono.just(List.of("a", "b", "c"));
}

private Mono<List<String>> getList2(){
    return Mono.just(Collections.emptyList());
}

private Mono<List<String>> getList3(){
    return Mono.just(List.of("A", "B", "C"));
}


    Flux.merge(getList1(), getList2(), getList3())
            .flatMapIterable(Function.identity())
            .collectList()
            .subscribe(System.out::println);  // [a, b, c, A, B, C]

参考: http://www.vinsguru.com/reactive-programming-reactor-combining-multiple-sources-of-flux-mono/

A​​ Mono::zip 将三个发布者异步组合在一起,我认为这是最好的解决方案。

否则这是一个非常简单的问题:

Mono<List<String>> m1 = Mono.just(Arrays.asList(new String[]{"A", "B", "C"}));
Mono<List<Character>> m2 = Mono.just(Arrays.asList(new Character[]{'a', 'b', 'c'}));
Mono<List<Integer>> m3 = Mono.just(Arrays.asList(new Integer[]{1, 2, 3}));
Mono.zip(m1, m2, m3)
        .map(tuple3->{
                List<Combined> c = new ArrayList<>();
                int size = tuple3.getT1().size();
                for ( int i=0; i < size; ++i ) {
                    c.add(new Combined(tuple3.getT1().get(i), tuple3.getT2().get(i), tuple3.getT3().get(i)));
                }
                return c;
        })
        .subscribe(System.out::println);
// [Combined(s=A, c=a, i=1), Combined(s=B, c=b, i=2), Combined(s=C, c=c, i=3)]

为了完整起见:

@Data
@AllArgsConstructor
class Combined {
    String s;
    Character c;
    Integer i;
}