WebClient 如何刷新部分接收到的数据?
WebClient How to flush part of received data?
我想对接收到的内容进行部分刷新。例如,我有处理程序:
return client
.post()
.body(BodyInserters.fromDataBuffers(
request.body(BodyExtractors.toDataBuffers())))
.exchange()
.....
如何在接收到一定数量的数据缓冲区时强制刷新?
首先,警告一下:
默认情况下(即如果你不手动刷新),Netty 会缓冲字节并不时刷新它们,只要通道准备好并且刷新策略认为方便。这针对性能进行了优化。
如果你想手动刷新,它不保证对方会以相同的方式接收这些字节组;中间人可能会在此过程中起到缓冲作用。这可能无法实现您想要做的事情:手动刷新通常与性能优化无关,而是与协议语义有关。
使用手动刷新策略只有在将其与协议语义(如消息分隔符)配对时才有用,以便另一方知道如何拆分消息(这就是 Spring WebFlux 正在为 SSE 和 application/streaming+json
.
做
现在,为了实现这一点,Reactor 提供了几个具有不同策略的 windowXYZ
运算符。 Flux.window(int)
基于元素的数量,windowTimeout(Duration)
基于持续时间,等等。在这种情况下,您可能希望使用 windowUntil(Predicate)
.
让我们尝试实现在缓冲一定数量的数据时刷新的东西。
Flux<DataBuffer> buffers = //...;
int maxSize = //...;
AtomicInteger currentSize = new AtomicInteger(0);
Flux<Flux<DataBuffer>> bufferWindow = buffers.windowUntil(buf -> {
if (currentSize.addAndGet(buf.readableByteCount()) < maxSize) {
return false;
}
currentSize.set(0);
return true;
});
WebClient.create()
.post()
.body((outputMessage, context) -> outputMessage.writeAndFlushWith(bufferWindow))
.retrieve();
请注意,如果您正在对无限数据流进行操作,则实施 存在缺陷:在达到配额或源完成之前不会刷新。所以这可能会保留数据超过必要的时间。
我想对接收到的内容进行部分刷新。例如,我有处理程序:
return client
.post()
.body(BodyInserters.fromDataBuffers(
request.body(BodyExtractors.toDataBuffers())))
.exchange()
.....
如何在接收到一定数量的数据缓冲区时强制刷新?
首先,警告一下:
默认情况下(即如果你不手动刷新),Netty 会缓冲字节并不时刷新它们,只要通道准备好并且刷新策略认为方便。这针对性能进行了优化。
如果你想手动刷新,它不保证对方会以相同的方式接收这些字节组;中间人可能会在此过程中起到缓冲作用。这可能无法实现您想要做的事情:手动刷新通常与性能优化无关,而是与协议语义有关。
使用手动刷新策略只有在将其与协议语义(如消息分隔符)配对时才有用,以便另一方知道如何拆分消息(这就是 Spring WebFlux 正在为 SSE 和
application/streaming+json
. 做
现在,为了实现这一点,Reactor 提供了几个具有不同策略的 windowXYZ
运算符。 Flux.window(int)
基于元素的数量,windowTimeout(Duration)
基于持续时间,等等。在这种情况下,您可能希望使用 windowUntil(Predicate)
.
让我们尝试实现在缓冲一定数量的数据时刷新的东西。
Flux<DataBuffer> buffers = //...;
int maxSize = //...;
AtomicInteger currentSize = new AtomicInteger(0);
Flux<Flux<DataBuffer>> bufferWindow = buffers.windowUntil(buf -> {
if (currentSize.addAndGet(buf.readableByteCount()) < maxSize) {
return false;
}
currentSize.set(0);
return true;
});
WebClient.create()
.post()
.body((outputMessage, context) -> outputMessage.writeAndFlushWith(bufferWindow))
.retrieve();
请注意,如果您正在对无限数据流进行操作,则实施 存在缺陷:在达到配额或源完成之前不会刷新。所以这可能会保留数据超过必要的时间。