当客户端中止请求时,WebFlux 如何停止发布者?

How in WebFlux to stop publisher when request is aborted by client?

SpringBoot v2.5.1

有一个端点请求很长的 运行 处理结果,它是以某种方式创建的
(为简单起见,它是 Mono.fromCallable( ... long running ... ).

客户端发出请求并触发发布者完成工作,但几秒钟后客户端中止请求(即连接丢失)。并且进程还在继续利用资源计算结果扔掉。

通知 Project Reactor 事件循环关于应该取消的不必要的正在进行的工作的机制是什么?

@RestController 
class EndpointSpin {
 
  @GetMapping("/spin")
  Mono<Long> spin() {
    AtomicLong counter = new AtomicLong(0);
    Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));

    return Mono.fromCallable(() -> {

      while (Instant.now().isBefore(stopTime)) {
        counter.incrementAndGet();

        if (counter.get() % 10_000_000 == 0) {
          System.out.println(counter.get());
        }

        // of course this does not work
        if (Thread.currentThread().isInterrupted()){
           break;
        }
      }

      return counter.get();
    });
  }
}

fromCallable 不会阻止您在 Callable 内阻止计算,您的示例演示了这一点。

Reactive Streams 中取消的主要方式是 cancel() 信号通过 Subscription 从下游传播。

即便如此,避免在反应式代码中阻塞代码的基本要求仍然成立,因为如果运算符足够简单(即同步),阻塞步骤甚至可以阻止 cancel() 的传播信号...

一种适应非反应性代码同时仍然收到取消通知的方法是Mono.create:它公开了一个MonoSink(通过Consumer<MonoSink>)可以用来推送元素到下游,同时它有一个 onCancel 处理程序。

您需要将代码重写为例如。在循环的每次迭代中检查 AtomicBoolean,并在接收器的 onCancel 处理程序中翻转 AtomicBoolean:

Mono.create(sink -> {
    AtomicBoolean isCancelled = new AtomicBoolean();
    sink.onCancel(() -> isCancelled.set(true));
    while (...) {
        ...
        if (isCancelled.get()) break;
    }
});

在您的示例中需要注意的另一件事是:AtomicInteger 是共享状态。如果您第二次订阅返回的 Mono,两个订阅将共享计数器并并行递增/检查它,这可能不太好。

Mono.createConsumer<MonoSink> 中创建这些状态变量可确保每个订阅都有自己独立的状态。