Java 如何消费流式 HTTP 响应?

How To Consume Stream HTTP Response In Java?

我在尝试使用持续流式传输实时事件的 HTTP 端点的响应时遇到问题。它实际上是 Docker 的端点之一:https://docs.docker.com/engine/api/v1.40/#operation/SystemEvents

我正在使用 Apache HTTP 客户端 4.5.5,当我尝试使用内容 InputStream 时它会无限期停止:

HttpEntity entity = resp.getEntity();
EntityUtils.consume(entity);//it just hangs here.
//Even if I don't call this method, Apache calls it automatically
//after running all my ResponseHandlers

显然,可以使用 JDK 的原始 URL 来完成:

但我不能那样做,因为本地 Docker 通过 Unix 套接字进行通信,我只能在 Apache 的 HTTP 客户端中使用 Java.[=15 中的 Unix 套接字的第 3 方库进行配置=]

如果有一个我可以切换到的更智能的 HTTP 客户端库,那也是一个选择。

如有任何想法,我们将不胜感激。谢谢!

我设法通过从响应 InputStream 生成 JsonObject 的无限 java.util.stream.Stream 来解决这个问题(我知道 json 阅读部分不是最优雅的解决方案,但有API 没有更好的方法,而且 Docker 不会在 json 之间发送任何分隔符)。

final InputStream content = response.getEntity().getContent();
final Stream<JsonObject> stream = Stream.generate(
    () -> {
        JsonObject read = null;
        try {
            final byte[] tmp = new byte[4096];
            while (content.read(tmp) != -1) {
                try {
                    final JsonReader reader = Json.createReader(
                        new ByteArrayInputStream(tmp)
                    );
                    read = reader.readObject();
                    break;
                } catch (final Exception exception) {
                    //Couldn't parse byte[] to Json,
                    //try to read more bytes.
                }
            }
        } catch (final IOException ex) {
            throw new IllegalStateException(
                "IOException when reading streamed JsonObjects!"
            );
        }
        return read;
    }
).onClose(
    () -> {
        try {
             ((CloseableHttpResponse) response).close();
        } catch (final IOException ex) {
            //There is a bug in Apache HTTPClient, when closing
            //an infinite InputStream: IOException is thrown
            //because the client still tries to read the remainder
            // of the closed Stream. We should ignore this case.
        }
    }
);
return stream;