如何在 java 反应式中为 flux / mono 调用多个服务?
How to invoke multiple services for a flux / mono in java reactive?
我是响应式世界的新手,听起来可能是个新手,我有大量 20-30 大小的产品,对于每个产品,我需要从不同的微服务中获取以下内容:
- 平均评论数
- 总评论数
- 愿望清单数量
- 变体..
- ..
6 ..
我试过的..
1. doOnNext
Flux<Product> products = ...
products
.doOnNext(product -> updateReviewCount)
.doOnNext(product -> updateTotalCommentCount)
.doOnNext(product -> updateWishlistedCount)
.doOnNext(product -> updateVariants)
...
事实证明,每个产品的每次调用都会阻塞链..
e.g.
Total records(20) * No. of service calls(5) * Time per service calls(30 ms) = 3000ms
但是时间会随着记录数增长||服务调用次数。
2。地图
使用我更新的地图和 returned 相同的参考,但结果相同。
3。收集所有作为列表并执行聚合查询到下游服务
Flux<Product> products = ...
products
.collectList() // Mono<List<Product>>
.doOnNext(productList -> updateReviewCountOfAllInList)
.doOnNext(productList -> updateFieldB_ForAllInList)
.doOnNext(productList -> updateFieldC_ForAllInList)
.doOnNext(productList -> updateFieldD_ForAllInList)
...
这确实提高了性能,虽然现在下游应用程序必须 return 更多数据用于查询,所以下游端的时间增加很少,但这没关系。
现在有了这个,我能够实现如下时间......
总记录(合并为列表,因此 1)* 服务调用次数(5)* 每次服务调用的时间(随着时间增加 50 毫秒)= 250 毫秒
但是时间会随着服务调用次数的增加而增长。
现在我需要并行化这些服务调用并并行执行这些服务调用,并在同一产品实例(同一引用)上更新它们各自的字段。
一些像下面
Flux<Product> products = ... // of 10 products
products
.collectList() // Mono<List<Product>>
.doAllInParallel(serviceCall1, serviceCall2, serviceCall3...)
. // get all updated products // flux size of 10
有了这个我想达到时间... 250/5 = 50ms
如何实现?
我找到了不同的文章,但我不确定最好的方法是什么?有人可以帮我解决这个问题吗?
它使用
products // Flux<Product>
.collectList() // Mono<List<Product>>
.flatMap(list -> Mono.zip( this.call1(list) ,this.call2(list) ) ) // will return Tuple
.map(t -> t.getT1)
.flatMapIterable(i->i)
Mono<Product> call1(List<Product> productList){
// some logic
}
Mono<Product> call2(List<Product> productList){
// some logic
}
实际上 zip 和 flatmapiterable ,也可以一步完成。这里只是为了演示。
我是响应式世界的新手,听起来可能是个新手,我有大量 20-30 大小的产品,对于每个产品,我需要从不同的微服务中获取以下内容:
- 平均评论数
- 总评论数
- 愿望清单数量
- 变体..
- .. 6 ..
我试过的..
1. doOnNext
Flux<Product> products = ...
products
.doOnNext(product -> updateReviewCount)
.doOnNext(product -> updateTotalCommentCount)
.doOnNext(product -> updateWishlistedCount)
.doOnNext(product -> updateVariants)
...
事实证明,每个产品的每次调用都会阻塞链..
e.g.
Total records(20) * No. of service calls(5) * Time per service calls(30 ms) = 3000ms
但是时间会随着记录数增长||服务调用次数。
2。地图 使用我更新的地图和 returned 相同的参考,但结果相同。
3。收集所有作为列表并执行聚合查询到下游服务
Flux<Product> products = ...
products
.collectList() // Mono<List<Product>>
.doOnNext(productList -> updateReviewCountOfAllInList)
.doOnNext(productList -> updateFieldB_ForAllInList)
.doOnNext(productList -> updateFieldC_ForAllInList)
.doOnNext(productList -> updateFieldD_ForAllInList)
...
这确实提高了性能,虽然现在下游应用程序必须 return 更多数据用于查询,所以下游端的时间增加很少,但这没关系。
现在有了这个,我能够实现如下时间...... 总记录(合并为列表,因此 1)* 服务调用次数(5)* 每次服务调用的时间(随着时间增加 50 毫秒)= 250 毫秒
但是时间会随着服务调用次数的增加而增长。
现在我需要并行化这些服务调用并并行执行这些服务调用,并在同一产品实例(同一引用)上更新它们各自的字段。 一些像下面
Flux<Product> products = ... // of 10 products
products
.collectList() // Mono<List<Product>>
.doAllInParallel(serviceCall1, serviceCall2, serviceCall3...)
. // get all updated products // flux size of 10
有了这个我想达到时间... 250/5 = 50ms
如何实现? 我找到了不同的文章,但我不确定最好的方法是什么?有人可以帮我解决这个问题吗?
它使用
products // Flux<Product>
.collectList() // Mono<List<Product>>
.flatMap(list -> Mono.zip( this.call1(list) ,this.call2(list) ) ) // will return Tuple
.map(t -> t.getT1)
.flatMapIterable(i->i)
Mono<Product> call1(List<Product> productList){
// some logic
}
Mono<Product> call2(List<Product> productList){
// some logic
}
实际上 zip 和 flatmapiterable ,也可以一步完成。这里只是为了演示。