在请求写入完成之前,WebClient 不会读取响应。
WebClient doesn't read response until request write is completed.
我正在尝试实施流媒体代理。
我遇到了来自 spring reactive.
的 WebClient 问题
任何人都可以帮助我理解我的方法是错误的还是只是 WebClient 端的错误?
堆栈:
reactor-netty 0.7.8.RELEASE
spring-boot 2.0.4.RELEASE
描述:
我想将一个长流代理到外部服务,然后将响应流转发给请求者。
流式传输使用块(HTTP 1.1 传输编码:块)进行。
外部服务处理每个块并发送到响应结果。
预期行为:
WebClient 应立即读取每个收到的响应部分。
实际行为:
在请求写入完成之前,WebClient 不会开始处理响应。
代码:
return client
.post()
.header("Transfer-Encoding", "chunked")
//because I want to flush each received part
.body((outputMessage, context) -> outputMessage.writeAndFlushWith(
request.body(BodyExtractors.toDataBuffers())
.map(dataBuffer -> Mono.just(dataBuffer))))
.exchange()
.flatMap(clientResponse -> {
ServerResponse.BodyBuilder bodyBuilder = ServerResponse.status(clientResponse.statusCode());
bodyBuilder.contentType(MediaType.APPLICATION_STREAM_JSON);
return bodyBuilder.body((outputMessage, context) ->
outputMessage.writeAndFlushWith(
clientResponse.body(BodyExtractors.toDataBuffers())
.map(dataBuffer -> Mono.just(dataBuffer))
));}
);
我查了一下,好像是设计的,SpringWebFlux的WebClient
和Reactor NettyHttpClient
都是为了先处理请求处理(发送请求正文),然后阅读响应正文。
其他 HTTP 客户端可能允许这样做,但我认为在这种情况下,这是一种 link 对两个 read/write 操作施加背压并将所有内容链接为单个反应管道的方法。
您可能正在寻找具有背压支持的面向消息的双向传输协议。您可以查看 WebSockets(尽管您需要在那里定义自己的消息语义)或关注 RSocket.
如果您只是在寻找一个高效的响应式网关,那么 Spring Cloud Gateway 是您的最佳选择,因为它一直是响应式的并且支持有趣的附加功能。
一些补充说明:
Spring WebFlux(在客户端和服务器级别)使用 Encoder
和 Decoder
实现,适应消息内容类型。一些特定的内容类型,例如 application/streaming+json
或 text/event-stream
是在考虑流媒体场景的情况下实现的。这意味着编码器在消息到来时写入消息,用特定字符分隔,并在网络上刷新。使用 application/octet-stream
或 application/json
等常规媒体类型不会触发该行为。对于这些情况,代理和中介可能会缓冲消息正文并传递 bigger/smaller windows。这就是为什么此类机制需要消息和适当的编解码器之间的分隔符。
据我了解,您使用的是 HTTP 1.1,它使用 request/response 机制 - HTTP 规范并未明确禁止服务器在读取完整请求之前写入响应,但它确实说无论如何它都必须读取完整的请求正文(或关闭连接)。参见 https://www.rfc-editor.org/rfc/rfc7230#section-3.4
一如既往,您可以在 https://jira.spring.io 上请求增强功能,尽管在这种情况下,我认为这是设计使然。
刚刚测试了基于 Jetty 的 WebClient 实现,它的行为符合您的预期。它可以在所有请求内容发送之前开始读取响应。
它应该在 Spring Framework 5.1 中发布
WebClient on Jetty new feature issue
我正在尝试实施流媒体代理。 我遇到了来自 spring reactive.
的 WebClient 问题任何人都可以帮助我理解我的方法是错误的还是只是 WebClient 端的错误?
堆栈:
reactor-netty 0.7.8.RELEASE
spring-boot 2.0.4.RELEASE
描述:
我想将一个长流代理到外部服务,然后将响应流转发给请求者。 流式传输使用块(HTTP 1.1 传输编码:块)进行。 外部服务处理每个块并发送到响应结果。
预期行为:
WebClient 应立即读取每个收到的响应部分。
实际行为:
在请求写入完成之前,WebClient 不会开始处理响应。
代码:
return client
.post()
.header("Transfer-Encoding", "chunked")
//because I want to flush each received part
.body((outputMessage, context) -> outputMessage.writeAndFlushWith(
request.body(BodyExtractors.toDataBuffers())
.map(dataBuffer -> Mono.just(dataBuffer))))
.exchange()
.flatMap(clientResponse -> {
ServerResponse.BodyBuilder bodyBuilder = ServerResponse.status(clientResponse.statusCode());
bodyBuilder.contentType(MediaType.APPLICATION_STREAM_JSON);
return bodyBuilder.body((outputMessage, context) ->
outputMessage.writeAndFlushWith(
clientResponse.body(BodyExtractors.toDataBuffers())
.map(dataBuffer -> Mono.just(dataBuffer))
));}
);
我查了一下,好像是设计的,SpringWebFlux的WebClient
和Reactor NettyHttpClient
都是为了先处理请求处理(发送请求正文),然后阅读响应正文。
其他 HTTP 客户端可能允许这样做,但我认为在这种情况下,这是一种 link 对两个 read/write 操作施加背压并将所有内容链接为单个反应管道的方法。
您可能正在寻找具有背压支持的面向消息的双向传输协议。您可以查看 WebSockets(尽管您需要在那里定义自己的消息语义)或关注 RSocket.
如果您只是在寻找一个高效的响应式网关,那么 Spring Cloud Gateway 是您的最佳选择,因为它一直是响应式的并且支持有趣的附加功能。
一些补充说明:
Spring WebFlux(在客户端和服务器级别)使用 Encoder
和 Decoder
实现,适应消息内容类型。一些特定的内容类型,例如 application/streaming+json
或 text/event-stream
是在考虑流媒体场景的情况下实现的。这意味着编码器在消息到来时写入消息,用特定字符分隔,并在网络上刷新。使用 application/octet-stream
或 application/json
等常规媒体类型不会触发该行为。对于这些情况,代理和中介可能会缓冲消息正文并传递 bigger/smaller windows。这就是为什么此类机制需要消息和适当的编解码器之间的分隔符。
据我了解,您使用的是 HTTP 1.1,它使用 request/response 机制 - HTTP 规范并未明确禁止服务器在读取完整请求之前写入响应,但它确实说无论如何它都必须读取完整的请求正文(或关闭连接)。参见 https://www.rfc-editor.org/rfc/rfc7230#section-3.4
一如既往,您可以在 https://jira.spring.io 上请求增强功能,尽管在这种情况下,我认为这是设计使然。
刚刚测试了基于 Jetty 的 WebClient 实现,它的行为符合您的预期。它可以在所有请求内容发送之前开始读取响应。 它应该在 Spring Framework 5.1 中发布 WebClient on Jetty new feature issue