Flow API 中的 subscription.request(n) 如何在任何 n 值下执行背压?
How does subscription.request(n) in Flow API perform backpressure at any n value?
我正在研究 Flow API,到目前为止,我了解到 request()
方法用于背压。大多数文章都说这类似于控制消费速度。
然而,几乎我看到的每个示例代码都将值 1
传递给 request()
方法,例如 subscription.request(1)
。但是不太明白request()
这个方法是怎么控制消费速度的。
我试图通过向发布者发送一堆项目并打印线程名称来 运行 进行测试,似乎每个 onNext()
都在 运行ning 上我使用的是同一个工作线程 request(1)
或 request(50)
:
@Override
public void onNext(T item) {
System.out.println(Thread.getCurrent().getName());
Thread.sleep(5000);
subscription.request(50);
}
如果 onNext()
在不同的线程中 运行,我可以理解传递给 request(n)
的 n
值将如何影响项目的速率正在并行处理(运行ning 在 n
线程中)。但在我的测试中似乎并非如此,因为它们都是 运行ning 在同一个线程名称下。
在这种情况下,request(1)
和 request(50)
仍然会在同一个线程上依次 运行 有什么区别?那消费率不还是一样吗?
request
中的 n
表示订阅者可以接受多少个元素,并限制上游 Publisher
可以发出多少个元素。因此,此生成器的减慢不是针对单个项目,而是生成的每个批次的平均时间与消费者的处理时间交错。
onNext
以序列化方式执行,并且取决于上游,也在同一线程上执行。因此,在那里调用 request
通常表示上游它可以在当前调用结束后调用相同的 onNext
、,如果可用,则使用下一个值。即,调用 Thread.sleep
将推迟 onNext
.
的下一次调用
通常,没有理由在 onNext
中调用 request
终端订阅者,因为它 运行 相对于其直接上游 Publisher
是同步的,并且那里单个 request(Long.MAX_VALUE)
和重复 request(1)
.
之间没有实际区别
调用 request
的少数几个原因之一,如果 onNext
分叉异步工作本身,并且只有在该工作结束时才应请求更多项目:
Executor executor = ...
Subscription upstream;
@Override public void onSubscribe(Subscription s) {
this.upstream = s;
executor.execute(() -> {
Thread.sleep(5000);
s.request(1);
return null; // Callable
});
}
@Override public void onNext(T item) {
System.out.println("Start onNext");
executor.execute(() -> {
System.out.println("Run work");
Thread.sleep(5000);
System.out.println("Request more work");
upstream.request(1);
return null; // Callable
});
System.out.println("End onNext");
}
使用此设置,上游将调用一次onNext
,并且仅当执行者执行的任务发出下一个请求时才会调用它。请注意,除非 Publisher
从专用线程发出,否则上面的示例最终会将 onNext
调用拖到 executor
的线程上。
onNext() 的调用不应 运行 并行。他们可以 运行 来自不同的线程(取决于实现),但总是顺序的。但即使按顺序,它们也可以以高于订户可以处理的速率被调用。因此订阅者仅在有空间容纳 n 个传入项目时才调用 request(n)。通常它只有一个值的空间,所以当这个变量空闲时它调用 request(1)。
我正在研究 Flow API,到目前为止,我了解到 request()
方法用于背压。大多数文章都说这类似于控制消费速度。
然而,几乎我看到的每个示例代码都将值 1
传递给 request()
方法,例如 subscription.request(1)
。但是不太明白request()
这个方法是怎么控制消费速度的。
我试图通过向发布者发送一堆项目并打印线程名称来 运行 进行测试,似乎每个 onNext()
都在 运行ning 上我使用的是同一个工作线程 request(1)
或 request(50)
:
@Override
public void onNext(T item) {
System.out.println(Thread.getCurrent().getName());
Thread.sleep(5000);
subscription.request(50);
}
如果 onNext()
在不同的线程中 运行,我可以理解传递给 request(n)
的 n
值将如何影响项目的速率正在并行处理(运行ning 在 n
线程中)。但在我的测试中似乎并非如此,因为它们都是 运行ning 在同一个线程名称下。
在这种情况下,request(1)
和 request(50)
仍然会在同一个线程上依次 运行 有什么区别?那消费率不还是一样吗?
request
中的 n
表示订阅者可以接受多少个元素,并限制上游 Publisher
可以发出多少个元素。因此,此生成器的减慢不是针对单个项目,而是生成的每个批次的平均时间与消费者的处理时间交错。
onNext
以序列化方式执行,并且取决于上游,也在同一线程上执行。因此,在那里调用 request
通常表示上游它可以在当前调用结束后调用相同的 onNext
、,如果可用,则使用下一个值。即,调用 Thread.sleep
将推迟 onNext
.
通常,没有理由在 onNext
中调用 request
终端订阅者,因为它 运行 相对于其直接上游 Publisher
是同步的,并且那里单个 request(Long.MAX_VALUE)
和重复 request(1)
.
调用 request
的少数几个原因之一,如果 onNext
分叉异步工作本身,并且只有在该工作结束时才应请求更多项目:
Executor executor = ...
Subscription upstream;
@Override public void onSubscribe(Subscription s) {
this.upstream = s;
executor.execute(() -> {
Thread.sleep(5000);
s.request(1);
return null; // Callable
});
}
@Override public void onNext(T item) {
System.out.println("Start onNext");
executor.execute(() -> {
System.out.println("Run work");
Thread.sleep(5000);
System.out.println("Request more work");
upstream.request(1);
return null; // Callable
});
System.out.println("End onNext");
}
使用此设置,上游将调用一次onNext
,并且仅当执行者执行的任务发出下一个请求时才会调用它。请注意,除非 Publisher
从专用线程发出,否则上面的示例最终会将 onNext
调用拖到 executor
的线程上。
onNext() 的调用不应 运行 并行。他们可以 运行 来自不同的线程(取决于实现),但总是顺序的。但即使按顺序,它们也可以以高于订户可以处理的速率被调用。因此订阅者仅在有空间容纳 n 个传入项目时才调用 request(n)。通常它只有一个值的空间,所以当这个变量空闲时它调用 request(1)。