如何完成不是字节数组输入流的异步 HTTP 客户端输入流?

How to accomplish an async HTTP client input stream that isn't byte array input stream?

我正在使用 Async Http Client 从 Internet 下载大量(可能很大)文件。

在我的特定情况下,我需要将字节的 InputStream 从这些下载 URL 发送到另一个服务进行解析。

一个天真的方法是这样做:

AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
    .setMaxConnectionsPerHost(-1)
    .setMaxConnections(-1)
    .setPooledConnectionIdleTimeout(60 * 10 * 1000)
    .setConnectionTtl(6 * 60 * 1000)
    .setConnectTimeout(5 * 1000)
    .setRequestTimeout(5 * 60 * 1000)
    .setFollowRedirect(true)
    .setRealm(new Realm.Builder(username, password)
        .setNtlmDomain(domain)
        .setScheme(Realm.AuthScheme.NTLM)
        .build())
Response httpGetResponse = asyncHttpClient.prepareGet(url).execute().get();
return httpGetResponse.getResponseBodyAsStream();

但是 in this tutorial for async http requests 我们了解到,与 HTTP 组件 http 客户端不同,异步 http 客户端会将整个文件下载到内存中。

就我而言,这将很快导致 OOM。

所以备选方案是这样的:

Response httpGetResponse = asyncHttpClient.prepareGet(url).execute(new AsyncHandler<Response>() {
    private final Response.ResponseBuilder builder = new Response.ResponseBuilder();

    @Override
    public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
      bodyPart.getBodyByteBuffer(); // Each chunk of bytes will be fed into this method.
                                    // I need to write these bytes to the resuting input stream
                                    // without streaming them all into memory.
      return State.CONTINUE;
    }

    @Override
    public State onHeadersReceived(HttpHeaders headers) throws Exception {
      builder.accumulate(headers);
      return State.CONTINUE;
    }

    @Override
    public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
      builder.accumulate(responseStatus);
      return State.CONTINUE;
    }

    @Override
    public Response onCompleted() throws Exception {
      return builder.build();
    }

    @Override
    public void onThrowable(Throwable t) {

    }
  }).get();

在输入流中获取这些字节的最简单、最干净的方法是什么?

我有两个想法:

1) 将输入写入文件,然后流式传输文件 要么 2) Return 一个管道输入流,字节将在收到时写入管道输入流。

有没有人可以分享一个可行的例子?

我正确地假设有人已经这样做了。事实上,在我搜索 "async http client" 和 "piped input stream" 之后,我在项目本身中找到了这个:

https://github.com/AsyncHttpClient/async-http-client/blob/master/client/src/main/java/org/asynchttpclient/handler/BodyDeferringAsyncHandler.java

用法:

  PipedInputStream pipedInputStream = new PipedInputStream();
  PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
  BodyDeferringAsyncHandler bodyDeferringAsyncHandler = new BodyDeferringAsyncHandler(pipedOutputStream);
  Future<Response> futureResponse = asyncHttpClient.prepareGet(url).execute(bodyDeferringAsyncHandler);
  Response response = bodyDeferringAsyncHandler.getResponse();
  if (response.getStatusCode() == 200) {
    return new BodyDeferringAsyncHandler.BodyDeferringInputStream(futureResponse,
        bodyDeferringAsyncHandler,
        pipedInputStream);
  } else {
    return null;
  }