Flow API 中的 subscription.request(n) 如何在任何 n 值下执行背压?

How does subscription.request(n) in Flow API perform backpressure at any n value?

我正在研究 Flow API,到目前为止,我了解到 request() 方法用于背压。大多数文章都说这类似于控制消费速度。

然而,几乎我看到的每个示例代码都将值 1 传递给 request() 方法,例如 subscription.request(1)。但是不太明白request()这个方法是怎么控制消费速度的。

我试图通过向发布者发送一堆项目并打印线程名称来 运行 进行测试,似乎每个 onNext() 都在 运行ning 上我使用的是同一个工作线程 request(1)request(50) :

@Override
public void onNext(T item) {
   System.out.println(Thread.getCurrent().getName());
   Thread.sleep(5000);
   subscription.request(50);
}

如果 onNext() 在不同的线程中 运行,我可以理解传递给 request(n)n 值将如何影响项目的速率正在并行处理(运行ning 在 n 线程中)。但在我的测试中似乎并非如此,因为它们都是 运行ning 在同一个线程名称下。

在这种情况下,request(1)request(50) 仍然会在同一个线程上依次 运行 有什么区别?那消费率不还是一样吗?

request 中的 n 表示订阅者可以接受多少个元素,并限制上游 Publisher 可以发出多少个元素。因此,此生成器的减慢不是针对单个项目,而是生成的每个批次的平均时间与消费者的处理时间交错。

onNext 以序列化方式执行,并且取决于上游,也在同一线程上执行。因此,在那里调用 request 通常表示上游它可以在当前调用结束后调用相同的 onNext,如果可用,则使用下一个值。即,调用 Thread.sleep 将推迟 onNext.

的下一次调用

通常,没有理由在 onNext 中调用 request 终端订阅者,因为它 运行 相对于其直接上游 Publisher 是同步的,并且那里单个 request(Long.MAX_VALUE) 和重复 request(1).

之间没有实际区别

调用 request 的少数几个原因之一,如果 onNext 分叉异步工作本身,并且只有在该工作结束时才应请求更多项目:

Executor executor = ...

Subscription upstream;

@Override public void onSubscribe(Subscription s) {
    this.upstream = s;
    executor.execute(() -> {
       Thread.sleep(5000);
       s.request(1);
       return null; // Callable
    });
}

@Override public void onNext(T item) {
    System.out.println("Start onNext");
    executor.execute(() -> {
       System.out.println("Run work");
       Thread.sleep(5000);
       System.out.println("Request more work");
       upstream.request(1);
       return null; // Callable
    });
    System.out.println("End onNext");
}

使用此设置,上游将调用一次onNext,并且仅当执行者执行的任务发出下一个请求时才会调用它。请注意,除非 Publisher 从专用线程发出,否则上面的示例最终会将 onNext 调用拖到 executor 的线程上。

onNext() 的调用不应 运行 并行。他们可以 运行 来自不同的线程(取决于实现),但总是顺序的。但即使按顺序,它们也可以以高于订户可以处理的速率被调用。因此订阅者仅在有空间容纳 n 个传入项目时才调用 request(n)。通常它只有一个值的空间,所以当这个变量空闲时它调用 request(1)