如何在 Flux 中使用 zipWith 迭代元组?
How to iterate over a Tuple using zipWith in Flux?
我想遍历提供 referenceIds
的基本列表。然后在下一步中,我想进行多个服务调用并将结果聚合到一个元组中。
然后作为最后一步,我想遍历我的所有 referenceIds
,并且 return 每个直接通过事件流生成 JSONObject
。
@GetMapping(value = "/test", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<JSONObject> test(Integer pages) {
pages = 1;
return Flux.range(1, pages)
.map(pageNumber -> Arrays.asList(1 * pageNumber, 2 * pageNumber, 3 * pageNumber)) //referenceIds for testing
.flatMap(numbers -> Flux.fromIterable(numbers).zipWith(Mono.zip(a(numbers), b(numbers)))) //results based on reference ids
.map(tuple -> {
Integer number = tuple.getT1();
Tuple2<List<String>, List<String>> lookup = tuple.getT2();
JSONObject json = new JSONObject();
json.put("number", number);
json.put("someMore", <fromLookup>);
return json;
});
}
对于此示例,return 类型的方法 a()
和 b()
无关紧要。重要的部分是:
如果我 return Flux.fromIterable(numbers);
一切正常。
但是当使用 .zipWith()
聚合时,我只收到数字列表的第一个元素。所有其他人都丢失了。为什么?
旁注:我需要使用 .zipWith()
并行执行这些方法调用(一些更长的 运行 事务)。
All others are lost. Why?
来自 Flux class 文档:
Zip this {@link Flux} with another {@link Publisher} source, that
is to say wait for both to emit one element and combine these
elements once into a {@link Tuple2}. The operator will continue
doing so until any of the sources completes.
您可以执行 a() 和 b() 调用,压缩结果,然后展开 numbers
列表到 flux 并像这样添加结果:
.flatMap(numbers -> Mono.zip(a(numbers), b(numbers))
.flatMapMany(tuple -> Flux.fromIterable(numbers).map(i -> Tuples.of(i,tuple))))
我想遍历提供 referenceIds
的基本列表。然后在下一步中,我想进行多个服务调用并将结果聚合到一个元组中。
然后作为最后一步,我想遍历我的所有 referenceIds
,并且 return 每个直接通过事件流生成 JSONObject
。
@GetMapping(value = "/test", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<JSONObject> test(Integer pages) {
pages = 1;
return Flux.range(1, pages)
.map(pageNumber -> Arrays.asList(1 * pageNumber, 2 * pageNumber, 3 * pageNumber)) //referenceIds for testing
.flatMap(numbers -> Flux.fromIterable(numbers).zipWith(Mono.zip(a(numbers), b(numbers)))) //results based on reference ids
.map(tuple -> {
Integer number = tuple.getT1();
Tuple2<List<String>, List<String>> lookup = tuple.getT2();
JSONObject json = new JSONObject();
json.put("number", number);
json.put("someMore", <fromLookup>);
return json;
});
}
对于此示例,return 类型的方法 a()
和 b()
无关紧要。重要的部分是:
如果我 return Flux.fromIterable(numbers);
一切正常。
但是当使用 .zipWith()
聚合时,我只收到数字列表的第一个元素。所有其他人都丢失了。为什么?
旁注:我需要使用 .zipWith()
并行执行这些方法调用(一些更长的 运行 事务)。
All others are lost. Why?
来自 Flux class 文档:
Zip this {@link Flux} with another {@link Publisher} source, that is to say wait for both to emit one element and combine these elements once into a {@link Tuple2}. The operator will continue doing so until any of the sources completes.
您可以执行 a() 和 b() 调用,压缩结果,然后展开 numbers
列表到 flux 并像这样添加结果:
.flatMap(numbers -> Mono.zip(a(numbers), b(numbers))
.flatMapMany(tuple -> Flux.fromIterable(numbers).map(i -> Tuples.of(i,tuple))))