如何在 Flux 中使用 zipWith 迭代元组?

How to iterate over a Tuple using zipWith in Flux?

我想遍历提供 referenceIds 的基本列表。然后在下一步中,我想进行多个服务调用并将结果聚合到一个元组中。

然后作为最后一步,我想遍历我的所有 referenceIds,并且 return 每个直接通过事件流生成 JSONObject

@GetMapping(value = "/test", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<JSONObject> test(Integer pages) {
    pages = 1;
    return Flux.range(1, pages)
            .map(pageNumber -> Arrays.asList(1 * pageNumber, 2 * pageNumber, 3 * pageNumber)) //referenceIds for testing
            .flatMap(numbers -> Flux.fromIterable(numbers).zipWith(Mono.zip(a(numbers), b(numbers)))) //results based on reference ids
            .map(tuple -> {
                Integer number = tuple.getT1();
                Tuple2<List<String>, List<String>> lookup = tuple.getT2();

                JSONObject json = new JSONObject();
                json.put("number", number);
                json.put("someMore", <fromLookup>);
                return json;
            });
}

对于此示例,return 类型的方法 a()b() 无关紧要。重要的部分是:

如果我 return Flux.fromIterable(numbers); 一切正常。 但是当使用 .zipWith() 聚合时,我只收到数字列表的第一个元素。所有其他人都丢失了。为什么?

旁注:我需要使用 .zipWith() 并行执行这些方法调用(一些更长的 运行 事务)。

All others are lost. Why?

来自 Flux class 文档:

Zip this {@link Flux} with another {@link Publisher} source, that is to say wait for both to emit one element and combine these elements once into a {@link Tuple2}. The operator will continue doing so until any of the sources completes.

您可以执行 a() 和 b() 调用,压缩结果,然后展开 numbers 列表到 flux 并像这样添加结果:

   .flatMap(numbers -> Mono.zip(a(numbers), b(numbers))
                .flatMapMany(tuple -> Flux.fromIterable(numbers).map(i -> Tuples.of(i,tuple))))