使用 Spring WebClient 时,由于 HttpClientOperations 收到最后一个 HTTP 数据包,连接被关闭

When use Spring WebClient, the connection is closed because of HttpClientOperations Received last HTTP packet

我正在使用 Spring WebClient 从我的服务器接收服务器发送的事件(由 Spring SSEEmitter 发布)。它工作正常,但是,由于某种原因,我的服务器定期在新行中发布一个字符串“0”。发生这种情况时,WebClient 将关闭连接。

我深入挖掘并打开 Spring 调试,发现以下信息:

     +--------------------...+
     |  0  1  2  3  4  5  ... |
     | 30 0d 0a 0d 0a     ...-|0....           |

[reactor.ipc.netty.http.client.HttpClientOperations:218] Received last HTTP packet
[reactor.ipc.netty.http.client.HttpClient:71] - USER_EVENT: [Handler Terminated]
[reactor.ipc.netty.channel.ChannelOperationsHandler:218] - Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@45a97ad8
[reactor.ipc.netty.channel.PooledClientContextHandler:218] - Releasing channel: 
[reactor.ipc.netty.resources.DefaultPoolResources:218] - Released, now 0 active connections
[reactor.ipc.netty.http.client.HttpClient:71] - READ COMPLETE
[reactor.ipc.netty.http.client.HttpClient:71] - READ COMPLETE
[reactor.ipc.netty.http.client.HttpClient:71] - CLOSE
[reactor.ipc.netty.http.client.HttpClient:71] - INACTIVE
[reactor.ipc.netty.http.client.HttpClient:71] - UNREGISTERED

很明显WebClient内部使用了Netty HttpClient,这个客户端把字符串“0”当作"lastHttpMessage",然后关闭连接。

我的问题:

您看到的 "0" 是 HTTP 规范的一部分 - 当服务器发送 HTTP 分块响应时,大小为 0 的块表示响应已完成。

我不认为我们可以在 Netty 中配置任何东西来改变它,我也不认为我们应该这样做,因为这是 HTTP 的预期行为。

现在我想知道为什么服务器首先要发送它。可能是服务器上的 Flux 已到达终点并正在发送 onComplete 信号,终止响应。您的服务器实现中可能有一些东西可以完成流。

您可以通过在管道中添加 log Reactor 运算符来查看哪个部分正在发送 onComplete 信号,从而尝试找出服务器中发生的情况。如果您有代码片段向我们展示,我认为您可以创建另一个问题,因为这个问题对 Spring 社区已经非常有用,让它变得更复杂并没有多大帮助。

Spring 框架将通过 WebClient 支持其他客户端库 - 例如,Jetty 客户端(请参阅问题 SPR-15092。但我认为使用其他客户端不会解决此问题问题,因为它符合 HTTP 而不是特定于客户端的行为。

我发现如果我使用"repeat",Flux会在连接完成后自动重新连接。甜蜜!

final Flux<Object> stream = WebClient
            .create("http://localhost:8080/ssp")
            .get().uri("/common/v1/eventstream")
            .retrieve()
            .bodyToFlux(ServerSentEvent.class)
            .flatMap(e -> Mono.justOrEmpty(e.data()))
            .repeat();

    stream.subscribe(e -> System.out.println(e));