如何使用 Java 的 HttpClient 在慢速流响应 body 上超时

How to timeout on a slow streaming response body with Java's HttpClient

当我需要以流方式处理响应时,我应该如何处理挂起使用 Java 11 中包含的 HTTP 客户端发送 HTTP 响应 body 的服务器?

阅读文档后,我知道可以设置 timeout on connection and a timeout on the request:

HttpClient httpClient = HttpClient.newBuilder()
        .connectTimeout(Duration.ofSeconds(2))
        .build();

HttpRequest httpRequest = HttpRequest.newBuilder(URI.create("http://example.com"))
        .timeout(Duration.ofSeconds(5))
        .build();

HttpResponse<Stream<String>> httpResponse = httpClient
        .send(httpRequest, HttpResponse.BodyHandlers.ofLines());

Stream<String> responseLineStream = httpResponse.body();
responseLineStream.count();

在上面的代码中:

这意味着当代码执行时,在 7 秒内将抛出异常,或者我们将到达最后一行。但是,最后一行不受任何超时限制。如果服务器停止发送响应 body,最后一行将永远阻塞。

在这种情况下如何防止最后一行挂起?

我猜这留给了流的消费者,因为这是处理逻辑的一部分,所以主体处理仍然可以用 CompletableFuture:

HttpResponse<Stream<String>> httpResponse = httpClient.send(httpRequest,
                                                            HttpResponse.BodyHandlers.ofLines());

Stream<String> responseLineStream = httpResponse.body();
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> responseLineStream.count());
long count = future.get(3, TimeUnit.SECONDS);

或者只是 Future 由 Java Executor 执行。

解决这个问题的一种方法是对接收整个正文所花费的时间设置超时。这就是 M A 的解决方案所做的。正如您所注意到的,如果超时评估,您应该关闭流,以便正确释放连接而不是挂在后台。一种更通用的方法是实现一个 BodySubscriber ,当上游未在超时时间内完成时,它会异常完成自身。这样就不必使用 sendAsync 或关闭流。这是一个合适的实现。

class TimeoutBodySubscriber<T> implements BodySubscriber<T> {
  private final BodySubscriber<T> downstream;
  private final Duration timeout;
  private Subscription subscription;

  /** Make sure downstream isn't called after we receive an onComplete or onError. */
  private boolean done;

  TimeoutBodySubscriber(BodySubscriber<T> downstream, Duration timeout) {
    this.downstream = downstream;
    this.timeout = timeout;
  }

  @Override
  public CompletionStage<T> getBody() {
    return downstream.getBody();
  }

  @Override
  public synchronized void onSubscribe(Subscription subscription) {
    this.subscription = requireNonNull(subscription);
    downstream.onSubscribe(subscription);

    // Schedule an error completion to be fired when timeout evaluates
    CompletableFuture.delayedExecutor(timeout.toMillis(), TimeUnit.MILLISECONDS)
        .execute(this::onTimeout);
  }

  private synchronized void onTimeout() {
    if (!done) {
      done = true;
      downstream.onError(new HttpTimeoutException("body completion timed out"));

      // Cancel subscription to release the connection, so it doesn't keep hanging in background
      subscription.cancel();
    }
  }

  @Override
  public synchronized void onNext(List<ByteBuffer> item) {
    if (!done) {
      downstream.onNext(item);
    }
  }

  @Override
  public synchronized void onError(Throwable throwable) {
    if (!done) {
      done = true;
      downstream.onError(throwable);
    }
  }

  @Override
  public synchronized void onComplete() {
    if (!done) {
      done = true;
      downstream.onComplete();
    }
  }

  static <T> BodyHandler<T> withBodyTimeout(BodyHandler<T> handler, Duration timeout) {
    return responseInfo -> new TimeoutBodySubscriber<>(handler.apply(responseInfo), timeout);
  }
}

可以这样使用:

Duration timeout = Duration.ofSeconds(10);
HttpResponse<Stream<String>> httpResponse = httpClient
        .send(httpRequest, TimeoutBodySubscriber.withTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));

另一种方法是使用读取超时。这更加灵活,因为只要服务器保持活动状态(即继续发送内容),响应就不会超时。如果 BodySubscriber 在超时内没有收到下一个请求的信号,您将需要一个异常完成的自身。这实现起来稍微复杂一些。如果您不介意依赖关系,则可以使用 Methanol。它按照描述实现读取超时。

Duration timeout = Duration.ofSeconds(3);
HttpResponse<Stream<String>> httpResponse = httpClient
    .send(httpRequest, MoreBodyHandlers.withReadTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));

另一种策略是结合使用两者:一旦服务器变得不活动或正文完成时间太长就超时。