流编程:订阅者和发布者要保持跟踪计数?

Flow programming: subscriber and publisher to keep track of count?

关于 Java9 中新的 Flow 相关接口,我收到了一个 article。来自那里的示例代码:

public class MySubscriber<T> implements Subscriber<T> {  
  private Subscription subscription;    
  @Override  
  public void onSubscribe(Subscription subscription) {  
    this.subscription = subscription;  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
  }        
  @Override  
  public void onNext(T item) {  
    System.out.println("Got : " + item);  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
   }  

如您所见,onNext() 请求推送 一个 新项目。

现在我在想:

服务器现在是否需要发送

换句话说:当多次调用request()时,这些数字相加吗?还是之前的请求 "discarded"?

引出问题标题 - 订户是否需要跟踪收到的物品,以避免在某个时候请求 "too many" 物品。

正如 Sotirios , the request method's Javadoc 所说(强调我的):

Adds the given number n of items to the current unfulfilled demand for this subscription. If n is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up to n additional onNext invocations (or fewer if terminated).

所以答案很明确是的,订阅者需要跟踪项目。事实上,这就是该机制的全部要点。一些背景: request 方法旨在允许订阅者应用 backpressure,通知上游组件它已过载和 "needs a break"。因此,订阅者(仅是其)的任务是仔细审查何时以及请求多少新项目。在该行中,它不能 "reconsider" 并降低要接收的项目数。

降低数量也会使发布者和订阅者之间的通信 "non-monotonic" 在某种意义上,总请求项目的数量可能会突然 降低(就目前而言只能增加)。这不仅在抽象意义上很烦人,它还带来了具体的一致性问题:当订阅者突然将请求的项目数量减少到 1 时,发布者可能正在交付一些项目——现在怎么办?

虽然 Java 9 没有实现 Reactive Streams API,但它提供了几乎相同的 API 并定义了相应的行为。

而在 Reactive Streams specification 中,此加法定义如下:

对于 1.1 中的发布者:

The total number of onNext signals sent by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.

对于 3.8 中的订阅:

While the Subscription is not cancelled, Subscription.request(long n) MUST register the given number of additional elements to be produced to the respective subscriber.

因此 Java 遵守 Reactive Streams API.

中给出的规范