如何在 java 反应式中为 flux / mono 调用多个服务?

How to invoke multiple services for a flux / mono in java reactive?

我是响应式世界的新手,听起来可能是个新手,我有大量 20-30 大小的产品,对于每个产品,我需要从不同的微服务中获取以下内容:

  1. 平均评论数
  2. 总评论数
  3. 愿望清单数量
  4. 变体..
  5. .. 6 ..

我试过的..

1. doOnNext

Flux<Product> products = ...
products
.doOnNext(product -> updateReviewCount)
.doOnNext(product -> updateTotalCommentCount)
.doOnNext(product -> updateWishlistedCount)
.doOnNext(product -> updateVariants)
...

事实证明,每个产品的每次调用都会阻塞链..

e.g.
Total records(20) * No. of service calls(5) * Time per service calls(30 ms) = 3000ms 

但是时间会随着记录数增长||服务调用次数。

2。地图 使用我更新的地图和 returned 相同的参考,但结果相同。

3。收集所有作为列表并执行聚合查询到下游服务

Flux<Product> products = ...
products
.collectList() // Mono<List<Product>>
.doOnNext(productList -> updateReviewCountOfAllInList)
.doOnNext(productList -> updateFieldB_ForAllInList)
.doOnNext(productList -> updateFieldC_ForAllInList)
.doOnNext(productList -> updateFieldD_ForAllInList)
...

这确实提高了性能,虽然现在下游应用程序必须 return 更多数据用于查询,所以下游端的时间增加很少,但这没关系。

现在有了这个,我能够实现如下时间...... 总记录(合并为列表,因此 1)* 服务调用次数(5)* 每次服务调用的时间(随着时间增加 50 毫秒)= 250 毫秒

但是时间会随着服务调用次数的增加而增长。

现在我需要并行化这些服务调用并并行执行这些服务调用,并在同一产品实例(同一引用)上更新它们各自的字段。 一些像下面

Flux<Product> products = ... // of 10 products
products
.collectList() // Mono<List<Product>>
.doAllInParallel(serviceCall1, serviceCall2, serviceCall3...)

. // get all updated products // flux size of 10

有了这个我想达到时间... 250/5 = 50ms

如何实现? 我找到了不同的文章,但我不确定最好的方法是什么?有人可以帮我解决这个问题吗?

它使用

products // Flux<Product>
.collectList() // Mono<List<Product>>
.flatMap(list -> Mono.zip( this.call1(list) ,this.call2(list) ) ) // will return Tuple
.map(t -> t.getT1) 
.flatMapIterable(i->i)

Mono<Product> call1(List<Product> productList){
   // some logic
}
Mono<Product> call2(List<Product> productList){
   // some logic
}

实际上 zip 和 flatmapiterable ,也可以一步完成。这里只是为了演示。