如何处理 sse 连接关闭?
How to handle sse connection closed?
我有一个像示例代码块中那样流式传输的端点。流式传输时,我通过 streamHelper.getStreamSuspendCount()
调用异步方法。我在改变状态时停止这个异步方法。但是当浏览器关闭并且会话终止时,我无法访问这个异步方法。更改状态时,我将停止会话范围内的异步方法。但是当浏览器关闭并且会话终止时,我无法访问这个异步方法。 Session 关闭时如何访问此范围?
@RequestMapping(value = "/stream/{columnId}/suspendCount", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Integer> suspendCount(@PathVariable String columnId) {
ColumnObject columnObject = streamHelper.findColumnObjectInListById(columnId);
return streamHelper.getStreamSuspendCount(columnObject);
}
getStreamSuspendCount(ColumnObject columnObject) {
...
//async flux
Flux<?> newFlux = beSubscribeFlow.get(i);
Disposable disposable = newFlux.subscribe();
beDisposeFlow.add(disposable); // my session scope variable. if change state, i will kill disposable (dispose()).
...
return Flux.fromStream(Stream.generate(() -> columnObject.getPendingObject().size())).distinctUntilChanged()
.doOnNext(i -> {
System.out.println(i);
}));
}
我认为部分问题是您试图获得一个 Disposable
并希望在会话结束时调用它。但在这样做时,您是在自己订阅序列。 Spring 框架还将订阅 getStreamSuspendCount
返回的 Flux
,并且需要取消该订阅才能让 SSE 客户端收到通知。
现在如何实现呢?您需要的是一种 "valve" ,它会在接收到外部信号时取消其来源。这就是 takeUntilOther(Publisher<?>)
所做的。
所以现在你需要一个 Publisher<?>
可以绑定到会话生命周期(更具体地说是会话关闭事件):一旦它发出,takeUntilOther
将取消它的源。
2 个选项:
- 会话关闭事件暴露在类似监听器中API:使用
Mono.create
- 您确实需要手动触发取消:使用
MonoProcessor.create()
并在时机成熟时通过它推送任何值
以下是简化的示例,其中包含 API 以说明:
创建
return theFluxForSSE.takeUntilOther(Mono.create(sink ->
sessionEvent.registerListenerForClose(closeEvent -> sink.success(closeEvent))
));
单处理器
MonoProcessor<String> processor = MonoProcessor.create();
beDisposeFlow.add(processor); // make it available to your session scope?
return theFluxForSSE.takeUntilOther(processor); //Spring will subscribe to this
让我们用计划任务模拟会话关闭:
Executors.newSingleThreadScheduledExecutor().schedule(() ->
processor.onNext("STOP") // that's the key part: manually sending data through the processor to signal takeUntilOther
, 2, TimeUnit.SECONDS);
这是一个模拟单元测试示例,您可以 运行 更好地理解会发生什么:
@Test
public void simulation() {
Flux<Long> theFluxForSSE = Flux.interval(Duration.ofMillis(100));
MonoProcessor<String> processor = MonoProcessor.create();
Executors.newSingleThreadScheduledExecutor().schedule(() -> processor.onNext("STOP"), 2, TimeUnit.SECONDS);
theFluxForSSE.takeUntilOther(processor.log())
.log()
.blockLast();
}
我有一个像示例代码块中那样流式传输的端点。流式传输时,我通过 streamHelper.getStreamSuspendCount()
调用异步方法。我在改变状态时停止这个异步方法。但是当浏览器关闭并且会话终止时,我无法访问这个异步方法。更改状态时,我将停止会话范围内的异步方法。但是当浏览器关闭并且会话终止时,我无法访问这个异步方法。 Session 关闭时如何访问此范围?
@RequestMapping(value = "/stream/{columnId}/suspendCount", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Integer> suspendCount(@PathVariable String columnId) {
ColumnObject columnObject = streamHelper.findColumnObjectInListById(columnId);
return streamHelper.getStreamSuspendCount(columnObject);
}
getStreamSuspendCount(ColumnObject columnObject) {
...
//async flux
Flux<?> newFlux = beSubscribeFlow.get(i);
Disposable disposable = newFlux.subscribe();
beDisposeFlow.add(disposable); // my session scope variable. if change state, i will kill disposable (dispose()).
...
return Flux.fromStream(Stream.generate(() -> columnObject.getPendingObject().size())).distinctUntilChanged()
.doOnNext(i -> {
System.out.println(i);
}));
}
我认为部分问题是您试图获得一个 Disposable
并希望在会话结束时调用它。但在这样做时,您是在自己订阅序列。 Spring 框架还将订阅 getStreamSuspendCount
返回的 Flux
,并且需要取消该订阅才能让 SSE 客户端收到通知。
现在如何实现呢?您需要的是一种 "valve" ,它会在接收到外部信号时取消其来源。这就是 takeUntilOther(Publisher<?>)
所做的。
所以现在你需要一个 Publisher<?>
可以绑定到会话生命周期(更具体地说是会话关闭事件):一旦它发出,takeUntilOther
将取消它的源。
2 个选项:
- 会话关闭事件暴露在类似监听器中API:使用
Mono.create
- 您确实需要手动触发取消:使用
MonoProcessor.create()
并在时机成熟时通过它推送任何值
以下是简化的示例,其中包含 API 以说明:
创建
return theFluxForSSE.takeUntilOther(Mono.create(sink ->
sessionEvent.registerListenerForClose(closeEvent -> sink.success(closeEvent))
));
单处理器
MonoProcessor<String> processor = MonoProcessor.create();
beDisposeFlow.add(processor); // make it available to your session scope?
return theFluxForSSE.takeUntilOther(processor); //Spring will subscribe to this
让我们用计划任务模拟会话关闭:
Executors.newSingleThreadScheduledExecutor().schedule(() ->
processor.onNext("STOP") // that's the key part: manually sending data through the processor to signal takeUntilOther
, 2, TimeUnit.SECONDS);
这是一个模拟单元测试示例,您可以 运行 更好地理解会发生什么:
@Test
public void simulation() {
Flux<Long> theFluxForSSE = Flux.interval(Duration.ofMillis(100));
MonoProcessor<String> processor = MonoProcessor.create();
Executors.newSingleThreadScheduledExecutor().schedule(() -> processor.onNext("STOP"), 2, TimeUnit.SECONDS);
theFluxForSSE.takeUntilOther(processor.log())
.log()
.blockLast();
}