为什么一些 Reactor 操作员请求的元素比他们感兴趣的多得多?

Why do some Reactor operators request far more elements than they are interested in?

我有以下代码:

Flux<String> flux = Flux.<String>never()
        .doOnRequest(n -> System.out.println("Requested " + n));

它是一个从不发出任何信号,但向控制台报告需求的 Flux。

以下各 3 行

flux.take(3).next().block();
flux.next().block();
flux.blockFirst();

产生这个输出:

Requested 9223372036854775807

查看代码,我看到以下内容。

BlockingSingleSubscriber(适用于 Flux#blockFirst()Mono#block():

public final void onSubscribe(Subscription s) {
    this.s = s;
    if (!cancelled) {
        s.request(Long.MAX_VALUE);
    }
}

MonoNext.NextSubscriber:

public void request(long n) {
    if (WIP.compareAndSet(this, 0, 1)) {
        s.request(Long.MAX_VALUE);
    }
}

FluxTake.TakeSubscriber:

public void request(long n) {
    if (wip == 0 && WIP.compareAndSet(this, 0, 1)) {
        if (n >= this.n) {
            s.request(Long.MAX_VALUE);
        } else {
            s.request(n);
        }
        return;
    }

    s.request(n);
}

所以Flux#blockFirst()Flux#next()Mono#block()总是向他们的上游发出无限制的需求信号,Flux#take()在某些情况下也可以这样做。

但是 Flux#blockFirst()Flux#next()Mono#block() 每个最多需要来自其上游的一个元素,而 Flux#take() 最多需要 this.n.

此外,Flux#take() javadoc 说明如下:

Note that this operator doesn't manipulate the backpressure requested amount. Rather, it merely lets requests from downstream propagate as is and cancels once N elements have been emitted. As a result, the source could produce a lot of extraneous elements in the meantime. If that behavior is undesirable and you do not own the request from downstream (e.g. prefetching operators), consider using {@link #limitRequest(long)} instead.

问题是:当他们预先知道限制时,为什么他们会发出无限制的需求信号?我的印象是反应性背压只是询问你准备消费的东西。但现实中往往是这样的:向上游喊'produce all you can',满意了就取消订阅。在上游产生大量记录的成本很高的情况下,这看起来简直是浪费。

tl;dr - 只请求你需要的东西在基于拉的系统中通常是理想的,但在基于推的系统中很少是理想的。

I had an impression that reactive backpressure was about only asking for what you are ready to consume.

不完全是,这是您能够消费的东西。差别很小,但很重要。

在基于拉动的系统中,您是完全正确的 - 请求比您所知道的更多的值几乎永远不会是一件好事,因为您请求的值越多,需要进行的工作就越多产生这些价值。

但请注意,反应流本质上是基于推送的,而不是基于拉取的。大多数反应式框架,包括反应器,都是在考虑这一点的情况下构建的——虽然混合或基于拉动的语义是可能的(例如,使用 Flux.generate() 按需一次生成一个元素),这在很大程度上是一种二次使用案件。规范是让发布者拥有大量需要卸载的数据,并且它“希望”尽快将其推送给您以摆脱它。

这很重要,因为它从请求的角度颠覆了关于什么是理想的观点。它不再成为“我最需要的是什么”的问题,而是“我能处理的最多的是什么”——数字越大越好。

举个例子,假设我有一个数据库查询返回 2000 条连接到通量的记录 - 但我只想要 1 条。如果我有一个推送这 2000 条记录的发布者,我调用 request(1),那么我根本就不是在“帮助”事情——我没有在数据库端造成更少的处理,那些记录已经在那里等待了。但是,由于我只请求了 1,因此发布者必须决定是否可以缓冲剩余的记录,或者最好跳过部分或全部记录,或者如果它不能跟上,则应该抛出异常,或者其他完全。不管它做什么,我实际上导致了 更多 的工作,在某些情况下甚至可能是一个例外,通过请求 更少 条记录。

当然,这并不总是可取 - 也许 Flux 中的那些额外元素确实会导致额外的处理,这是浪费的,也许网络带宽是一个主要问题,等等。在那在这种情况下,您可能希望显式调用 limitRequest()。但在大多数情况下,这可能不是您想要的行为。

(为了完整起见,best 方案当然是在源头限制数据 - 如果您只想要一个,请在数据库查询中添加 LIMIT 1例如值。那么您就不必担心这些东西。但是,当然,在现实世界中,这并不总是可能的。)