当客户端中止请求时,WebFlux 如何停止发布者?
How in WebFlux to stop publisher when request is aborted by client?
SpringBoot v2.5.1
有一个端点请求很长的 运行 处理结果,它是以某种方式创建的
(为简单起见,它是 Mono.fromCallable( ... long running ... )
.
客户端发出请求并触发发布者完成工作,但几秒钟后客户端中止请求(即连接丢失)。并且进程还在继续利用资源计算结果扔掉。
通知 Project Reactor 事件循环关于应该取消的不必要的正在进行的工作的机制是什么?
@RestController
class EndpointSpin {
@GetMapping("/spin")
Mono<Long> spin() {
AtomicLong counter = new AtomicLong(0);
Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));
return Mono.fromCallable(() -> {
while (Instant.now().isBefore(stopTime)) {
counter.incrementAndGet();
if (counter.get() % 10_000_000 == 0) {
System.out.println(counter.get());
}
// of course this does not work
if (Thread.currentThread().isInterrupted()){
break;
}
}
return counter.get();
});
}
}
fromCallable
不会阻止您在 Callable
内阻止计算,您的示例演示了这一点。
Reactive Streams 中取消的主要方式是 cancel()
信号通过 Subscription
从下游传播。
即便如此,避免在反应式代码中阻塞代码的基本要求仍然成立,因为如果运算符足够简单(即同步),阻塞步骤甚至可以阻止 cancel()
的传播信号...
一种适应非反应性代码同时仍然收到取消通知的方法是Mono.create
:它公开了一个MonoSink
(通过Consumer<MonoSink>
)可以用来推送元素到下游,同时它有一个 onCancel
处理程序。
您需要将代码重写为例如。在循环的每次迭代中检查 AtomicBoolean
,并在接收器的 onCancel
处理程序中翻转 AtomicBoolean:
Mono.create(sink -> {
AtomicBoolean isCancelled = new AtomicBoolean();
sink.onCancel(() -> isCancelled.set(true));
while (...) {
...
if (isCancelled.get()) break;
}
});
在您的示例中需要注意的另一件事是:AtomicInteger
是共享状态。如果您第二次订阅返回的 Mono
,两个订阅将共享计数器并并行递增/检查它,这可能不太好。
在 Mono.create
的 Consumer<MonoSink>
中创建这些状态变量可确保每个订阅都有自己独立的状态。
SpringBoot v2.5.1
有一个端点请求很长的 运行 处理结果,它是以某种方式创建的
(为简单起见,它是 Mono.fromCallable( ... long running ... )
.
客户端发出请求并触发发布者完成工作,但几秒钟后客户端中止请求(即连接丢失)。并且进程还在继续利用资源计算结果扔掉。
通知 Project Reactor 事件循环关于应该取消的不必要的正在进行的工作的机制是什么?
@RestController
class EndpointSpin {
@GetMapping("/spin")
Mono<Long> spin() {
AtomicLong counter = new AtomicLong(0);
Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));
return Mono.fromCallable(() -> {
while (Instant.now().isBefore(stopTime)) {
counter.incrementAndGet();
if (counter.get() % 10_000_000 == 0) {
System.out.println(counter.get());
}
// of course this does not work
if (Thread.currentThread().isInterrupted()){
break;
}
}
return counter.get();
});
}
}
fromCallable
不会阻止您在 Callable
内阻止计算,您的示例演示了这一点。
Reactive Streams 中取消的主要方式是 cancel()
信号通过 Subscription
从下游传播。
即便如此,避免在反应式代码中阻塞代码的基本要求仍然成立,因为如果运算符足够简单(即同步),阻塞步骤甚至可以阻止 cancel()
的传播信号...
一种适应非反应性代码同时仍然收到取消通知的方法是Mono.create
:它公开了一个MonoSink
(通过Consumer<MonoSink>
)可以用来推送元素到下游,同时它有一个 onCancel
处理程序。
您需要将代码重写为例如。在循环的每次迭代中检查 AtomicBoolean
,并在接收器的 onCancel
处理程序中翻转 AtomicBoolean:
Mono.create(sink -> {
AtomicBoolean isCancelled = new AtomicBoolean();
sink.onCancel(() -> isCancelled.set(true));
while (...) {
...
if (isCancelled.get()) break;
}
});
在您的示例中需要注意的另一件事是:AtomicInteger
是共享状态。如果您第二次订阅返回的 Mono
,两个订阅将共享计数器并并行递增/检查它,这可能不太好。
在 Mono.create
的 Consumer<MonoSink>
中创建这些状态变量可确保每个订阅都有自己独立的状态。