Reactor 中的提取器如何阻塞(在性能方面)
How the extractors in Reactor blocking (in terms of performance)
我注意到不建议使用此类提取器 Mono.toFuture()
、Flux.collectList()
,因为它们会阻塞流量。
我不是很确定'blocking'在哪个方向。就像下面的代码一样,我知道 Flux.collectList()
会等待所有项目完成,它会像某个线程一样继续等待,还是只是最后一个完成 .collectList()
事情的线程?
听说Mono.toFuture()
也会阻塞,会不会马上return一个'future'(以后在onNext()
或[=17时就可以用了) =] 发生),或者它将 return 直到 onNext()
或 onComplete()
发生?
var m = Flux.range(0, 100)
.parallel()
.runOn(Schedulers.boundedElastic())
.map(i -> Mono.fromFuture(
Mono.just(i).map(n -> {
try {
var s = (long) (Math.random() * 100);
Thread.sleep(s);
System.out.println(Thread.currentThread() + "after " + s + "ms awaking: " + n);
} catch (InterruptedException e) {
e.printStackTrace();
}
return n;
}).toFuture())
)
.doOnNext(o -> System.out.println(Thread.currentThread() + "before sequential"))
.sequential();
var mm = Flux.merge(m)
.doOnNext(o -> System.out.println(Thread.currentThread() + "before collecting"))
.collectList()
.doOnNext(o -> System.out.println(Thread.currentThread() + "before map"))
.map(list -> list.stream().map(i -> i).collect(Collectors.toList()))
.publishOn(Schedulers.single())
.toFuture();
你的假设不太正确。
Mono.toFuture() 根本不阻塞——它只是 returns 一个 CompleteableFuture
,你可以阻塞它(如果你调用它的 get()
方法)或异步执行(如果你使用它的任何异步方法,如 thenApply()
、thenCompose()
等)你会脱离反应器上下文,因此会放弃诸如背压之类的东西,但你不必立即块。
您可能正在考虑(非常)旧版本的 reactor,我相信其中有一个 toFuture()
变体返回 Future
,而不是 CompleteableFuture
- 并且虽然这也没有阻塞,但它使您处于 必须 然后阻塞的上下文中,因为 Future
没有异步组件。因此,虽然方法调用本身没有阻塞,但这是您当时唯一的选择。
与流行的看法相反 Flux.collectList()
也 不是 阻塞 - 它特别是 returns 一个 Mono<List<T>>
,这是一个非阻塞将发出单个元素的发布者,该元素是该流量中所有内容的列表。当然,您可以在此发布者上调用 block()
,该操作会阻塞 - 但调用 collectList()
本身并不比任何其他运算符更阻塞。
话虽如此,它肯定 会 引起问题。由于它正在做的事情的性质(将所有元素从一个 flux 收集到内存中的一个列表中),它可能并不理想:
- 您可能需要等待很长时间才能发出列表,没有关于它包含多少元素的反馈,或者它是否被填充;
- 如果 flux 中的元素数量或元素大小特别大,您可能 运行 内存不足;
- 您无法在添加元素时输出任何中间状态,因此您会失去诸如流 JSON 支持之类的东西。
这并不能使它 阻塞 然而,这只是意味着在决定它是否是一个值得在您的应用程序中使用的运算符之前,您需要权衡一组不同的潜在问题特定场景。
我注意到不建议使用此类提取器 Mono.toFuture()
、Flux.collectList()
,因为它们会阻塞流量。
我不是很确定'blocking'在哪个方向。就像下面的代码一样,我知道 Flux.collectList()
会等待所有项目完成,它会像某个线程一样继续等待,还是只是最后一个完成 .collectList()
事情的线程?
听说Mono.toFuture()
也会阻塞,会不会马上return一个'future'(以后在onNext()
或[=17时就可以用了) =] 发生),或者它将 return 直到 onNext()
或 onComplete()
发生?
var m = Flux.range(0, 100)
.parallel()
.runOn(Schedulers.boundedElastic())
.map(i -> Mono.fromFuture(
Mono.just(i).map(n -> {
try {
var s = (long) (Math.random() * 100);
Thread.sleep(s);
System.out.println(Thread.currentThread() + "after " + s + "ms awaking: " + n);
} catch (InterruptedException e) {
e.printStackTrace();
}
return n;
}).toFuture())
)
.doOnNext(o -> System.out.println(Thread.currentThread() + "before sequential"))
.sequential();
var mm = Flux.merge(m)
.doOnNext(o -> System.out.println(Thread.currentThread() + "before collecting"))
.collectList()
.doOnNext(o -> System.out.println(Thread.currentThread() + "before map"))
.map(list -> list.stream().map(i -> i).collect(Collectors.toList()))
.publishOn(Schedulers.single())
.toFuture();
你的假设不太正确。
Mono.toFuture() 根本不阻塞——它只是 returns 一个 CompleteableFuture
,你可以阻塞它(如果你调用它的 get()
方法)或异步执行(如果你使用它的任何异步方法,如 thenApply()
、thenCompose()
等)你会脱离反应器上下文,因此会放弃诸如背压之类的东西,但你不必立即块。
您可能正在考虑(非常)旧版本的 reactor,我相信其中有一个 toFuture()
变体返回 Future
,而不是 CompleteableFuture
- 并且虽然这也没有阻塞,但它使您处于 必须 然后阻塞的上下文中,因为 Future
没有异步组件。因此,虽然方法调用本身没有阻塞,但这是您当时唯一的选择。
与流行的看法相反 Flux.collectList()
也 不是 阻塞 - 它特别是 returns 一个 Mono<List<T>>
,这是一个非阻塞将发出单个元素的发布者,该元素是该流量中所有内容的列表。当然,您可以在此发布者上调用 block()
,该操作会阻塞 - 但调用 collectList()
本身并不比任何其他运算符更阻塞。
话虽如此,它肯定 会 引起问题。由于它正在做的事情的性质(将所有元素从一个 flux 收集到内存中的一个列表中),它可能并不理想:
- 您可能需要等待很长时间才能发出列表,没有关于它包含多少元素的反馈,或者它是否被填充;
- 如果 flux 中的元素数量或元素大小特别大,您可能 运行 内存不足;
- 您无法在添加元素时输出任何中间状态,因此您会失去诸如流 JSON 支持之类的东西。
这并不能使它 阻塞 然而,这只是意味着在决定它是否是一个值得在您的应用程序中使用的运算符之前,您需要权衡一组不同的潜在问题特定场景。