为什么一些 Reactor 操作员请求的元素比他们感兴趣的多得多?
Why do some Reactor operators request far more elements than they are interested in?
我有以下代码:
Flux<String> flux = Flux.<String>never()
.doOnRequest(n -> System.out.println("Requested " + n));
它是一个从不发出任何信号,但向控制台报告需求的 Flux。
以下各 3 行
flux.take(3).next().block();
flux.next().block();
flux.blockFirst();
产生这个输出:
Requested 9223372036854775807
查看代码,我看到以下内容。
BlockingSingleSubscriber
(适用于 Flux#blockFirst()
和 Mono#block()
:
public final void onSubscribe(Subscription s) {
this.s = s;
if (!cancelled) {
s.request(Long.MAX_VALUE);
}
}
MonoNext.NextSubscriber
:
public void request(long n) {
if (WIP.compareAndSet(this, 0, 1)) {
s.request(Long.MAX_VALUE);
}
}
FluxTake.TakeSubscriber
:
public void request(long n) {
if (wip == 0 && WIP.compareAndSet(this, 0, 1)) {
if (n >= this.n) {
s.request(Long.MAX_VALUE);
} else {
s.request(n);
}
return;
}
s.request(n);
}
所以Flux#blockFirst()
、Flux#next()
和Mono#block()
总是向他们的上游发出无限制的需求信号,Flux#take()
在某些情况下也可以这样做。
但是 Flux#blockFirst()
、Flux#next()
和 Mono#block()
每个最多需要来自其上游的一个元素,而 Flux#take()
最多需要 this.n.
此外,Flux#take()
javadoc 说明如下:
Note that this operator doesn't manipulate the backpressure requested amount.
Rather, it merely lets requests from downstream propagate as is and cancels once
N elements have been emitted. As a result, the source could produce a lot of
extraneous elements in the meantime. If that behavior is undesirable and you do
not own the request from downstream (e.g. prefetching operators), consider
using {@link #limitRequest(long)} instead.
问题是:当他们预先知道限制时,为什么他们会发出无限制的需求信号?我的印象是反应性背压只是询问你准备消费的东西。但现实中往往是这样的:向上游喊'produce all you can',满意了就取消订阅。在上游产生大量记录的成本很高的情况下,这看起来简直是浪费。
tl;dr - 只请求你需要的东西在基于拉的系统中通常是理想的,但在基于推的系统中很少是理想的。
I had an impression that reactive backpressure was about only asking for what you are ready to consume.
不完全是,这是您能够消费的东西。差别很小,但很重要。
在基于拉动的系统中,您是完全正确的 - 请求比您所知道的更多的值几乎永远不会是一件好事,因为您请求的值越多,需要进行的工作就越多产生这些价值。
但请注意,反应流本质上是基于推送的,而不是基于拉取的。大多数反应式框架,包括反应器,都是在考虑这一点的情况下构建的——虽然混合或基于拉动的语义是可能的(例如,使用 Flux.generate()
按需一次生成一个元素),这在很大程度上是一种二次使用案件。规范是让发布者拥有大量需要卸载的数据,并且它“希望”尽快将其推送给您以摆脱它。
这很重要,因为它从请求的角度颠覆了关于什么是理想的观点。它不再成为“我最需要的是什么”的问题,而是“我能处理的最多的是什么”——数字越大越好。
举个例子,假设我有一个数据库查询返回 2000 条连接到通量的记录 - 但我只想要 1 条。如果我有一个推送这 2000 条记录的发布者,我调用 request(1)
,那么我根本就不是在“帮助”事情——我没有在数据库端造成更少的处理,那些记录已经在那里等待了。但是,由于我只请求了 1,因此发布者必须决定是否可以缓冲剩余的记录,或者最好跳过部分或全部记录,或者如果它不能跟上,则应该抛出异常,或者其他完全。不管它做什么,我实际上导致了 更多 的工作,在某些情况下甚至可能是一个例外,通过请求 更少 条记录。
当然,这并不总是可取 - 也许 Flux 中的那些额外元素确实会导致额外的处理,这是浪费的,也许网络带宽是一个主要问题,等等。在那在这种情况下,您可能希望显式调用 limitRequest()
。但在大多数情况下,这可能不是您想要的行为。
(为了完整起见,best 方案当然是在源头限制数据 - 如果您只想要一个,请在数据库查询中添加 LIMIT 1
例如值。那么您就不必担心这些东西。但是,当然,在现实世界中,这并不总是可能的。)
我有以下代码:
Flux<String> flux = Flux.<String>never()
.doOnRequest(n -> System.out.println("Requested " + n));
它是一个从不发出任何信号,但向控制台报告需求的 Flux。
以下各 3 行
flux.take(3).next().block();
flux.next().block();
flux.blockFirst();
产生这个输出:
Requested 9223372036854775807
查看代码,我看到以下内容。
BlockingSingleSubscriber
(适用于 Flux#blockFirst()
和 Mono#block()
:
public final void onSubscribe(Subscription s) {
this.s = s;
if (!cancelled) {
s.request(Long.MAX_VALUE);
}
}
MonoNext.NextSubscriber
:
public void request(long n) {
if (WIP.compareAndSet(this, 0, 1)) {
s.request(Long.MAX_VALUE);
}
}
FluxTake.TakeSubscriber
:
public void request(long n) {
if (wip == 0 && WIP.compareAndSet(this, 0, 1)) {
if (n >= this.n) {
s.request(Long.MAX_VALUE);
} else {
s.request(n);
}
return;
}
s.request(n);
}
所以Flux#blockFirst()
、Flux#next()
和Mono#block()
总是向他们的上游发出无限制的需求信号,Flux#take()
在某些情况下也可以这样做。
但是 Flux#blockFirst()
、Flux#next()
和 Mono#block()
每个最多需要来自其上游的一个元素,而 Flux#take()
最多需要 this.n.
此外,Flux#take()
javadoc 说明如下:
Note that this operator doesn't manipulate the backpressure requested amount. Rather, it merely lets requests from downstream propagate as is and cancels once N elements have been emitted. As a result, the source could produce a lot of extraneous elements in the meantime. If that behavior is undesirable and you do not own the request from downstream (e.g. prefetching operators), consider using {@link #limitRequest(long)} instead.
问题是:当他们预先知道限制时,为什么他们会发出无限制的需求信号?我的印象是反应性背压只是询问你准备消费的东西。但现实中往往是这样的:向上游喊'produce all you can',满意了就取消订阅。在上游产生大量记录的成本很高的情况下,这看起来简直是浪费。
tl;dr - 只请求你需要的东西在基于拉的系统中通常是理想的,但在基于推的系统中很少是理想的。
I had an impression that reactive backpressure was about only asking for what you are ready to consume.
不完全是,这是您能够消费的东西。差别很小,但很重要。
在基于拉动的系统中,您是完全正确的 - 请求比您所知道的更多的值几乎永远不会是一件好事,因为您请求的值越多,需要进行的工作就越多产生这些价值。
但请注意,反应流本质上是基于推送的,而不是基于拉取的。大多数反应式框架,包括反应器,都是在考虑这一点的情况下构建的——虽然混合或基于拉动的语义是可能的(例如,使用 Flux.generate()
按需一次生成一个元素),这在很大程度上是一种二次使用案件。规范是让发布者拥有大量需要卸载的数据,并且它“希望”尽快将其推送给您以摆脱它。
这很重要,因为它从请求的角度颠覆了关于什么是理想的观点。它不再成为“我最需要的是什么”的问题,而是“我能处理的最多的是什么”——数字越大越好。
举个例子,假设我有一个数据库查询返回 2000 条连接到通量的记录 - 但我只想要 1 条。如果我有一个推送这 2000 条记录的发布者,我调用 request(1)
,那么我根本就不是在“帮助”事情——我没有在数据库端造成更少的处理,那些记录已经在那里等待了。但是,由于我只请求了 1,因此发布者必须决定是否可以缓冲剩余的记录,或者最好跳过部分或全部记录,或者如果它不能跟上,则应该抛出异常,或者其他完全。不管它做什么,我实际上导致了 更多 的工作,在某些情况下甚至可能是一个例外,通过请求 更少 条记录。
当然,这并不总是可取 - 也许 Flux 中的那些额外元素确实会导致额外的处理,这是浪费的,也许网络带宽是一个主要问题,等等。在那在这种情况下,您可能希望显式调用 limitRequest()
。但在大多数情况下,这可能不是您想要的行为。
(为了完整起见,best 方案当然是在源头限制数据 - 如果您只想要一个,请在数据库查询中添加 LIMIT 1
例如值。那么您就不必担心这些东西。但是,当然,在现实世界中,这并不总是可能的。)