WebFlux 控制器返回通量和背压

WebFlux Controllers Returning Flux and Backpressure

Spring WebFlux中我有一个类似这样的控制器:

@RestController
@RequestMapping("/data")
public class DataController {

  @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
  public Flux<Data> getData() {
    return <data from database using reactive driver>
  } 
}
  1. 订阅发布者到底是什么?
  2. 什么(如果有的话)提供背压?

对于上下文,我正在尝试评估在这种特定情况下使用 Spring WebFlux 是否比 Spring 有优势MVC.

What exactly is subscribing to the publisher?

框架(所以 Spring,在这种情况下。)

一般来说,您不应该在自己的应用程序中订阅 - 框架应该在必要时订阅您的发布者。在 spring 的上下文中,每当相关请求到达该控制器时。

What (if anything) is providing backpressure?

在这种情况下,它只是受连接速度的限制(我相信Webflux会查看底层的TCP层)然后根据需要请求数据。您的上游流量是否 监听 背压是另一回事 - 它可能会,或者它可能只是向消费者提供尽可能多的数据。

For context I'm trying to evaluate if there are advantages to using Spring WebFlux in this specific situation over Spring MVC.

主要优点是能够仅用几个线程保持打开的大量连接——因此没有上下文切换的开销。 (这不是 唯一的 优势,但大多数优势通常都归结为这一点。)通常,这只是一个值得考虑的优势,如果你需要保持在数千的区域连接立即打开。

主要缺点是响应式代码看起来 非常 不同于标准 Java 代码,因此通常必然更复杂。调试也更难 - 例如,原始堆栈跟踪几乎变得毫无用处(尽管它们是使调试更容易的工具和技术。)

注意:我不是spring框架的开发者,所以欢迎任何评论。

What exactly is subscribing to the publisher?

这是对端口的长期订阅(服务器初始化本身)。因此,ReactorHttpServer.class 有方法:

@Override
protected void startInternal() {
    DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block();
    setPort(((InetSocketAddress) server.address()).getPort());
    this.serverRef.set(server);
}

Subscriber 是绑定方法,(据我所知)request(Long.MAX_VALUE),所以这里没有背压管理。

请求处理的重要部分是方法handle(this.reactorHandler)reactorHandlerReactorHttpHandlerAdapter 的实例。堆栈的更上层(在 ReactorHttpHandlerAdapterapply 方法中)是 DispatcherHandler.class。此 class 的 java 文档开头为“HTTP 请求的中央调度程序 handlers/controllers。调度到已注册的处理程序以处理请求,提供方便的映射工具。”。它有中心方法:

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    if (this.handlerMappings == null) {
        return createNotFoundError();
    }
    return Flux.fromIterable(this.handlerMappings)
            .concatMap(mapping -> mapping.getHandler(exchange))
            .next()
            .switchIfEmpty(createNotFoundError())
            .flatMap(handler -> invokeHandler(exchange, handler))
            .flatMap(result -> handleResult(exchange, result));
}

在这里,实际的请求处理发生了。回复写在handleResult内。现在取决于实际的服务器实现,结果如何写。

对于默认服务器,即 Reactor Netty,它将是 ReactorServerHttpResponse.class。在这里你可以看到方法writeWithInternal。这个获取处理程序方法的 publisher 结果并将其写入底层 HTTP 连接:

@Override
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> publisher) {
    return this.response.send(toByteBufs(publisher)).then();
}   

NettyOutbound.send( ... ) 的一个实现是 reactor.netty.channel.ChannelOperations。对于 Flux return 的特定情况,此实现管理 MonoSendMany.class 内的 NIO。这个 class 使用 SendManyInner.class 执行 subscribe( ... ),它通过实施 onSubscribe 执行 request(128)Subscriber 来执行背压管理。我猜 Netty 内部使用 TCP ACK 来表示传输成功。

所以,

What (if anything) is providing backpressure?

... 是的,提供背压,例如SendManyInner.class,但也存在其他实现。

For context I'm trying to evaluate if there are advantages to using Spring WebFlux in this specific situation over Spring MVC.

我觉得,绝对值得评价。但是我猜想,对于性能,结果将取决于并发请求的数量,也可能取决于您的 Data class 的类型。一般来说,Webflux 通常是高吞吐量、低延迟情况的首选,而且我们通常会在我们的环境中看到更好的硬件利用率。假设您从数据库中获取数据,您可能会使用也支持反应式的数据库驱动程序获得最佳结果。除了性能之外,背压管理始终是了解 Webflux 的一个很好的理由。自从我们采用了Webflux之后,我们的数据平台再也没有稳定性问题(不是说,没有其他方法可以让系统稳定,但这里很多问题都是开箱即用的)。

附带说明:我建议,仔细看看 Schedulers 我们最近通过为慢速数据库访问选择正确的时间获得了 30% cpu 时间。

编辑:https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-fn-handler-functions 参考文档中明确指出:

ServerRequest and ServerResponse are immutable interfaces that offer JDK 8-friendly access to the HTTP request and response. Both request and response provide Reactive Streams back pressure against the body streams.