RxJava 退订网络关闭

RxJava unsubscribe on Network close

我目前正在尝试取消订阅一个可观察对象,如果网络请求已关闭,则不会进一步流式传输数据。

我的 Observable 是通过以下方式创建的:

Observable.interval(1, TimeUnit.SECONDS)
.map { LocalDateTime.now() }.share()

所以有多个订阅者。但是我不知道在网络关闭的情况下如何退订。

我目前正在通过 vert.x 将服务器发送事件的数据流式传输到客户端,如下所示:

flow.subscribe({
  response.write("the current time is $it")
}, ::println, {
  response.end()
})

如果我取消来自客户端的请求,observable 将继续 "stream" 数据。

感谢帮助

您可以通过调用 dispose()

取消订阅
Disposable disposable = flow.subscribe({
  response.write("the current time is $it")
}, ::println, {
  response.end()
})

disposable.dispose();

更新:自定义可观察

val observable: Observable<LocalDateTime> = Observable.create { emitter ->
    val timer = Timer()
    timer.schedule(timerTask {
        if (emitter.isDisposed) {//<-- cancel emmitting items if disposed
            timer.cancel()
        } else {
            emitter.onNext(LocalDateTime.now())
        }
    }, 0, 1000)

}
disposable = observable.share().subscribe { t ->
    System.out.println(" Hello World! $t");
    disposable?.dispose()//<-- here calling dispose() causes observable to stop emitting items
}

您可以使用 takeUntil 运算符响应 closeHandler:

router.get("/test").handler(ctx -> {
    ctx.response().setChunked(true);
    Observable.interval(1, TimeUnit.SECONDS)
            .share()
            .doFinally(() -> System.out.println("Disposed"))
            .takeUntil(Observable.create(e -> ctx.response().closeHandler(event -> e.onComplete())))
            .subscribe(l -> ctx.response().write(l.toString()));
});