项目 Reactor 中的并行 Flux 与 Flux
Parallel Flux vs Flux in project Reactor
所以我从文档中了解到,并行 Flux 本质上将通量元素划分为单独的 rails。(本质上类似于分组)。就线程而言,这将是调度程序的工作。因此,让我们考虑这样的情况。所有这些都将 运行 在通过 运行On() 方法提供的同一调度程序实例上。
让我们考虑如下情况:
Mono<Response> = webClientCallAPi(..) //function returning Mono from webclient call
现在假设我们进行了大约 100 次调用
Flux.range(0,100).subscribeOn(Schedulers.boundedElastic()).flatMap(i -> webClientCallApi(i)).collecttoList() // or subscribe somehow
如果我们使用 paralleFlux:
Flux.range(0,100).parallel().runOn(Schedulers.boundedElastic()).flatMap(i -> webClientCallApi(i)).sequential().collecttoList();
所以如果我的理解是正确的,它看起来很相似。那么 ParallelFlux 相对于 Flux 的优势是什么?什么时候应该使用 parallelFlux 而不是 flux?
在实践中,您可能很少需要使用并联磁通,包括在此示例中。
在您的示例中,您将触发 100 个网络服务调用。请记住,执行此操作所需的实际 work 非常低 - 您生成并触发异步请求,然后一段时间后您会收到响应。在请求和响应之间你根本没有做任何工作,它只需要 tiny 数量的 CPU 资源,当每个请求被发送时,另一个 tiny 关于收到每个响应的时间。 (这是使用异步框架发出 Web 请求的核心优势之一,您不会在请求进行中占用任何线程。)
如果你将这个流量和 运行 并行拆分,你是说你想要拆分这些少量的 CPU 资源,以便它们可以同时 运行,在不同的 CPU 核心上。这完全没有意义——拆分通量的开销,运行将其并行处理,然后再将其组合起来,比让它在普通的顺序调度程序上执行要大得多。
另一方面,假设我有一个 Flux<Integer>
,我想检查这些整数中的每一个是否都是素数 - 或者可能是我想要的 Flux<String>
个密码检查 BCrypt 散列。这些类型的操作 确实 CPU 密集,因此在这种情况下,用于跨内核拆分执行的并行通量可能很有意义。但实际上,这些情况在正常的反应器用例中很少发生。
(另外,作为结束语,您几乎总是希望将 Schedulers.parallel()
与平行通量一起使用,而不是 Schedulers.boundedElastic()
。)
所以我从文档中了解到,并行 Flux 本质上将通量元素划分为单独的 rails。(本质上类似于分组)。就线程而言,这将是调度程序的工作。因此,让我们考虑这样的情况。所有这些都将 运行 在通过 运行On() 方法提供的同一调度程序实例上。 让我们考虑如下情况:
Mono<Response> = webClientCallAPi(..) //function returning Mono from webclient call
现在假设我们进行了大约 100 次调用
Flux.range(0,100).subscribeOn(Schedulers.boundedElastic()).flatMap(i -> webClientCallApi(i)).collecttoList() // or subscribe somehow
如果我们使用 paralleFlux:
Flux.range(0,100).parallel().runOn(Schedulers.boundedElastic()).flatMap(i -> webClientCallApi(i)).sequential().collecttoList();
所以如果我的理解是正确的,它看起来很相似。那么 ParallelFlux 相对于 Flux 的优势是什么?什么时候应该使用 parallelFlux 而不是 flux?
在实践中,您可能很少需要使用并联磁通,包括在此示例中。
在您的示例中,您将触发 100 个网络服务调用。请记住,执行此操作所需的实际 work 非常低 - 您生成并触发异步请求,然后一段时间后您会收到响应。在请求和响应之间你根本没有做任何工作,它只需要 tiny 数量的 CPU 资源,当每个请求被发送时,另一个 tiny 关于收到每个响应的时间。 (这是使用异步框架发出 Web 请求的核心优势之一,您不会在请求进行中占用任何线程。)
如果你将这个流量和 运行 并行拆分,你是说你想要拆分这些少量的 CPU 资源,以便它们可以同时 运行,在不同的 CPU 核心上。这完全没有意义——拆分通量的开销,运行将其并行处理,然后再将其组合起来,比让它在普通的顺序调度程序上执行要大得多。
另一方面,假设我有一个 Flux<Integer>
,我想检查这些整数中的每一个是否都是素数 - 或者可能是我想要的 Flux<String>
个密码检查 BCrypt 散列。这些类型的操作 确实 CPU 密集,因此在这种情况下,用于跨内核拆分执行的并行通量可能很有意义。但实际上,这些情况在正常的反应器用例中很少发生。
(另外,作为结束语,您几乎总是希望将 Schedulers.parallel()
与平行通量一起使用,而不是 Schedulers.boundedElastic()
。)