WebClient 如何刷新部分接收到的数据?

WebClient How to flush part of received data?

我想对接收到的内容进行部分刷新。例如,我有处理程序:

return client
    .post()
    .body(BodyInserters.fromDataBuffers(
        request.body(BodyExtractors.toDataBuffers())))
    .exchange()
    .....

如何在接收到一定数量的数据缓冲区时强制刷新?

首先,警告一下:

  • 默认情况下(即如果你不手动刷新),Netty 会缓冲字节并不时刷新它们,只要通道准备好并且刷新策略认为方便。这针对性能进行了优化。

  • 如果你想手动刷新,它不保证对方会以相同的方式接收这些字节组;中间人可能会在此过程中起到缓冲作用。这可能无法实现您想要做的事情:手动刷新通常与性能优化无关,而是与协议语义有关。

  • 使用手动刷新策略只有在将其与协议语义(如消息分隔符)配对时才有用,以便另一方知道如何拆分消息(这就是 Spring WebFlux 正在为 SSE 和 application/streaming+json.

现在,为了实现这一点,Reactor 提供了几个具有不同策略的 windowXYZ 运算符。 Flux.window(int) 基于元素的数量,windowTimeout(Duration) 基于持续时间,等等。在这种情况下,您可能希望使用 windowUntil(Predicate).

让我们尝试实现在缓冲一定数量的数据时刷新的东西。

Flux<DataBuffer> buffers = //...;

int maxSize = //...;
AtomicInteger currentSize = new AtomicInteger(0);
Flux<Flux<DataBuffer>> bufferWindow = buffers.windowUntil(buf -> {
    if (currentSize.addAndGet(buf.readableByteCount()) < maxSize) {
        return false;
    }
    currentSize.set(0);
    return true;
});

WebClient.create()
        .post()
        .body((outputMessage, context) -> outputMessage.writeAndFlushWith(bufferWindow))
        .retrieve();

请注意,如果您正在对无限数据流进行操作,则实施 存在缺陷:在达到配额或源完成之前不会刷新。所以这可能会保留数据超过必要的时间。