使用 Reactor Netty 客户端一次读取大 JSON 有效负载,而不是分块读取以保持长期持久连接

Read big JSON payload all at once with Reactor Netty client, and not in chunks for keep-alive long lived persistent connection

为了描述我正在尝试 solve/implement 的用例,我们与第 3 方服务集成,该服务通过 HTTP/1 发送连续的数据流 (JSON) .1 long-living/persistent 连接(最多可以保持打开状态 2-3 小时,这是另一个问题,关于连接池在这种情况下的行为方式,因为 HTTP 客户端上的所有连接仅连接到此主机)。但是我现在遇到的问题,我像下面这样连接订阅数据

 client
            .get()
            .uri("/")
            .responseContent()
            .asString()
             // .aggregate() // 1
            .subscribe(content -> {
                  logger.info("Running content on thread {}: {}",
                        Thread.currentThread().getName(), content);
            });

对于小 JSON 它工作正常,对于大 JSON,content 包含完整 JSON 的一部分,所以它本身不是有效 JSON 或完整数据。是否有可能始终获取整个数据,无论一个块中的大小如何(有没有办法设置一个大的 ByteBuf 大小以适应任何 JSON 有效载荷到一个响应中?)或者如果没有,如何您可以等待并将相同 JSON 响应的多个部分组合成完整有效的 JSON?

举个例子,我的意思是(我减少了示例中 JSON 有效负载的大小,但只是为了演示这个想法),说服务器发送以下内容 JSON:

{"id":1, data: [1, 2, 3]}

在客户端,我得到 3 个块中的 data/response,即

(然后在第 3 块之后我在日志中看到 READ COMPLETE,见下文)。

如果我启用 aggregate() 则根本不会打印任何内容,据我所知,它将等待连接关闭,但由于它是持久的长期连接,因此无法正常工作。

有趣的部分是,如果在客户端启用 .wiretap(true),当大 JSON 被拆分为多个 ByteBuf-s 时,它会在日志中打印 READ COMPLETE只有当完整的 JSON 内容被消费时(不是在每个单独的 ByteBuf 部分之后),这可能意味着客户端知道来自服务器的单个数据响应的处理何时结束。

我发现有 .httpResponseDecoder 因为它看起来像客户端将块限制为 16kb,虽然我将它扩展到 2mb,即 .httpResponseDecoder(spec -> spec.maxChunkSize(2 * 1024 * 1024)),但仍然没有得到整个 JSON 作为单个数据块,仍然看到每个块最大 16kb 的限制。

知道如何实现这个或在哪里查看吗?

在 Gitter https://gitter.im/reactor/reactor-netty 中,Violeta Georgieva 建议尝试使用 JsonObjectDecoder,将以下内容添加到客户端:

client.doOnConnected(connection -> connection.addHandler(new JsonObjectDecoder()))

现在每个订阅处理都得到正确的 JSON。

您可以减少通量并连接数据以获得整个响应体。

    Mono<byte[]> responses = client
        .baseUrl(baseUrl)
        .get()
        .uri(uri)
        .response(((response, byteBufFlux) -> {
            // Check status code here if you want
            return byteBufFlux;
        }))
        .map(Utils::readByteBuf) // It reads ByteBuf to a byte array
        .reduce(this::concatByteArray);
        
  private byte[] concatByteArray(byte[] dataPart1, byte[] dataPart2) {
    byte[] data = new byte[dataPart1.length + dataPart2.length];
    System.arraycopy(dataPart1, 0, data, 0, dataPart1.length);
    System.arraycopy(dataPart2, 0, data, dataPart1.length, dataPart2.length);
    return data;
  }

并且您可以优化它以使用 ByteBuf 而不是字节数组。