RxJava 退订网络关闭
RxJava unsubscribe on Network close
我目前正在尝试取消订阅一个可观察对象,如果网络请求已关闭,则不会进一步流式传输数据。
我的 Observable 是通过以下方式创建的:
Observable.interval(1, TimeUnit.SECONDS)
.map { LocalDateTime.now() }.share()
所以有多个订阅者。但是我不知道在网络关闭的情况下如何退订。
我目前正在通过 vert.x 将服务器发送事件的数据流式传输到客户端,如下所示:
flow.subscribe({
response.write("the current time is $it")
}, ::println, {
response.end()
})
如果我取消来自客户端的请求,observable 将继续 "stream" 数据。
感谢帮助
您可以通过调用 dispose()
取消订阅
Disposable disposable = flow.subscribe({
response.write("the current time is $it")
}, ::println, {
response.end()
})
disposable.dispose();
更新:自定义可观察
val observable: Observable<LocalDateTime> = Observable.create { emitter ->
val timer = Timer()
timer.schedule(timerTask {
if (emitter.isDisposed) {//<-- cancel emmitting items if disposed
timer.cancel()
} else {
emitter.onNext(LocalDateTime.now())
}
}, 0, 1000)
}
disposable = observable.share().subscribe { t ->
System.out.println(" Hello World! $t");
disposable?.dispose()//<-- here calling dispose() causes observable to stop emitting items
}
您可以使用 takeUntil
运算符响应 closeHandler
:
router.get("/test").handler(ctx -> {
ctx.response().setChunked(true);
Observable.interval(1, TimeUnit.SECONDS)
.share()
.doFinally(() -> System.out.println("Disposed"))
.takeUntil(Observable.create(e -> ctx.response().closeHandler(event -> e.onComplete())))
.subscribe(l -> ctx.response().write(l.toString()));
});
我目前正在尝试取消订阅一个可观察对象,如果网络请求已关闭,则不会进一步流式传输数据。
我的 Observable 是通过以下方式创建的:
Observable.interval(1, TimeUnit.SECONDS)
.map { LocalDateTime.now() }.share()
所以有多个订阅者。但是我不知道在网络关闭的情况下如何退订。
我目前正在通过 vert.x 将服务器发送事件的数据流式传输到客户端,如下所示:
flow.subscribe({
response.write("the current time is $it")
}, ::println, {
response.end()
})
如果我取消来自客户端的请求,observable 将继续 "stream" 数据。
感谢帮助
您可以通过调用 dispose()
取消订阅Disposable disposable = flow.subscribe({
response.write("the current time is $it")
}, ::println, {
response.end()
})
disposable.dispose();
更新:自定义可观察
val observable: Observable<LocalDateTime> = Observable.create { emitter ->
val timer = Timer()
timer.schedule(timerTask {
if (emitter.isDisposed) {//<-- cancel emmitting items if disposed
timer.cancel()
} else {
emitter.onNext(LocalDateTime.now())
}
}, 0, 1000)
}
disposable = observable.share().subscribe { t ->
System.out.println(" Hello World! $t");
disposable?.dispose()//<-- here calling dispose() causes observable to stop emitting items
}
您可以使用 takeUntil
运算符响应 closeHandler
:
router.get("/test").handler(ctx -> {
ctx.response().setChunked(true);
Observable.interval(1, TimeUnit.SECONDS)
.share()
.doFinally(() -> System.out.println("Disposed"))
.takeUntil(Observable.create(e -> ctx.response().closeHandler(event -> e.onComplete())))
.subscribe(l -> ctx.response().write(l.toString()));
});