流编程:订阅者和发布者要保持跟踪计数?
Flow programming: subscriber and publisher to keep track of count?
关于 Java9 中新的 Flow
相关接口,我收到了一个 article。来自那里的示例代码:
public class MySubscriber<T> implements Subscriber<T> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
如您所见,onNext()
请求推送 一个 新项目。
现在我在想:
- 如果
onSubscribe()
要求,说 5 件
- 并且在第一个项目交付后,
request(1)
像上面那样被调用
服务器现在是否需要发送
- 五件物品(5 件请求。-1 件已发送。+1 件请求)
- 或一个项目(因为之前的请求通过新请求获得 "discarded")
换句话说:当多次调用request()
时,这些数字相加吗?还是之前的请求 "discarded"?
引出问题标题 - 订户是否需要跟踪收到的物品,以避免在某个时候请求 "too many" 物品。
正如 Sotirios , the request
method's Javadoc 所说(强调我的):
Adds the given number n
of items to the current unfulfilled demand for this subscription. If n
is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up to n
additional onNext invocations (or fewer if terminated).
所以答案很明确是的,订阅者需要跟踪项目。事实上,这就是该机制的全部要点。一些背景: request
方法旨在允许订阅者应用 backpressure,通知上游组件它已过载和 "needs a break"。因此,订阅者(仅是其)的任务是仔细审查何时以及请求多少新项目。在该行中,它不能 "reconsider" 并降低要接收的项目数。
降低数量也会使发布者和订阅者之间的通信 "non-monotonic" 在某种意义上,总请求项目的数量可能会突然 降低(就目前而言只能增加)。这不仅在抽象意义上很烦人,它还带来了具体的一致性问题:当订阅者突然将请求的项目数量减少到 1 时,发布者可能正在交付一些项目——现在怎么办?
虽然 Java 9 没有实现 Reactive Streams API,但它提供了几乎相同的 API 并定义了相应的行为。
而在 Reactive Streams specification 中,此加法定义如下:
对于 1.1 中的发布者:
The total number of onNext signals sent by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.
对于 3.8 中的订阅:
While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.
因此 Java 遵守 Reactive Streams API.
中给出的规范
关于 Java9 中新的 Flow
相关接口,我收到了一个 article。来自那里的示例代码:
public class MySubscriber<T> implements Subscriber<T> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded
}
如您所见,onNext()
请求推送 一个 新项目。
现在我在想:
- 如果
onSubscribe()
要求,说 5 件 - 并且在第一个项目交付后,
request(1)
像上面那样被调用
服务器现在是否需要发送
- 五件物品(5 件请求。-1 件已发送。+1 件请求)
- 或一个项目(因为之前的请求通过新请求获得 "discarded")
换句话说:当多次调用request()
时,这些数字相加吗?还是之前的请求 "discarded"?
引出问题标题 - 订户是否需要跟踪收到的物品,以避免在某个时候请求 "too many" 物品。
正如 Sotirios request
method's Javadoc 所说(强调我的):
Adds the given number
n
of items to the current unfulfilled demand for this subscription. Ifn
is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up ton
additional onNext invocations (or fewer if terminated).
所以答案很明确是的,订阅者需要跟踪项目。事实上,这就是该机制的全部要点。一些背景: request
方法旨在允许订阅者应用 backpressure,通知上游组件它已过载和 "needs a break"。因此,订阅者(仅是其)的任务是仔细审查何时以及请求多少新项目。在该行中,它不能 "reconsider" 并降低要接收的项目数。
降低数量也会使发布者和订阅者之间的通信 "non-monotonic" 在某种意义上,总请求项目的数量可能会突然 降低(就目前而言只能增加)。这不仅在抽象意义上很烦人,它还带来了具体的一致性问题:当订阅者突然将请求的项目数量减少到 1 时,发布者可能正在交付一些项目——现在怎么办?
虽然 Java 9 没有实现 Reactive Streams API,但它提供了几乎相同的 API 并定义了相应的行为。
而在 Reactive Streams specification 中,此加法定义如下:
对于 1.1 中的发布者:
The total number of onNext signals sent by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.
对于 3.8 中的订阅:
While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.
因此 Java 遵守 Reactive Streams API.
中给出的规范