如何在 Vert.x 中取消对 Reactive Stream 的订阅

How to cancel the subscription to a Reactive Stream in Vert.x


我正在使用 ReactiveStreams 在 Vert.x:

中发布 SSE 事件
ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
        .map(l -> String.format("Number of Customer added %s .%n",
                customerRepository.findAll().size() + " "))
        .buildRs();

有什么方法可以取消数据流的订阅吗? 谢谢

buildRs()方法returns一个Publisher:

Publisher<String> publisher = ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
  .map(l -> String.format("Number of Customer added %s .%n",
    customerRepository.findAll().size() + " "))
  .buildRs();

当您订阅此 Publisher 时,您可以保留对 Subscription 的引用,然后在完成后取消发射:

publisher
  .subscribe(new Subscriber<String>() {
    volatile Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
      this.subscription = subscription;
    }

    @Override
    public void onNext(String s) {
      // when no more event is needed
      subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
      // handle error
    }

    @Override
    public void onComplete() {
      // handle complete
    }
  });