如何在 Vert.x 中取消对 Reactive Stream 的订阅
How to cancel the subscription to a Reactive Stream in Vert.x
我正在使用 ReactiveStreams 在 Vert.x:
中发布 SSE 事件
ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
.map(l -> String.format("Number of Customer added %s .%n",
customerRepository.findAll().size() + " "))
.buildRs();
有什么方法可以取消数据流的订阅吗?
谢谢
buildRs()
方法returns一个Publisher
:
Publisher<String> publisher = ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
.map(l -> String.format("Number of Customer added %s .%n",
customerRepository.findAll().size() + " "))
.buildRs();
当您订阅此 Publisher
时,您可以保留对 Subscription
的引用,然后在完成后取消发射:
publisher
.subscribe(new Subscriber<String>() {
volatile Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(String s) {
// when no more event is needed
subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// handle error
}
@Override
public void onComplete() {
// handle complete
}
});
我正在使用 ReactiveStreams 在 Vert.x:
ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
.map(l -> String.format("Number of Customer added %s .%n",
customerRepository.findAll().size() + " "))
.buildRs();
有什么方法可以取消数据流的订阅吗? 谢谢
buildRs()
方法returns一个Publisher
:
Publisher<String> publisher = ReactiveStreams.fromPublisher(vertx.periodicStream(1000).toPublisher())
.map(l -> String.format("Number of Customer added %s .%n",
customerRepository.findAll().size() + " "))
.buildRs();
当您订阅此 Publisher
时,您可以保留对 Subscription
的引用,然后在完成后取消发射:
publisher
.subscribe(new Subscriber<String>() {
volatile Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(String s) {
// when no more event is needed
subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// handle error
}
@Override
public void onComplete() {
// handle complete
}
});