如何使用 Java 的 HttpClient 在慢速流响应 body 上超时
How to timeout on a slow streaming response body with Java's HttpClient
当我需要以流方式处理响应时,我应该如何处理挂起使用 Java 11 中包含的 HTTP 客户端发送 HTTP 响应 body 的服务器?
阅读文档后,我知道可以设置 timeout on connection and a timeout on the request:
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(2))
.build();
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create("http://example.com"))
.timeout(Duration.ofSeconds(5))
.build();
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
responseLineStream.count();
在上面的代码中:
- 如果2秒内无法建立连接,将抛出超时异常。
- 如果5秒内没有收到响应,将抛出超时异常。经实验,连接建立后开始计时,对于这种
BodyHandler
,收到状态行和headers就认为收到了响应。
这意味着当代码执行时,在 7 秒内将抛出异常,或者我们将到达最后一行。但是,最后一行不受任何超时限制。如果服务器停止发送响应 body,最后一行将永远阻塞。
在这种情况下如何防止最后一行挂起?
我猜这留给了流的消费者,因为这是处理逻辑的一部分,所以主体处理仍然可以用 CompletableFuture
:
HttpResponse<Stream<String>> httpResponse = httpClient.send(httpRequest,
HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> responseLineStream.count());
long count = future.get(3, TimeUnit.SECONDS);
或者只是 Future
由 Java Executor
执行。
解决这个问题的一种方法是对接收整个正文所花费的时间设置超时。这就是 M A 的解决方案所做的。正如您所注意到的,如果超时评估,您应该关闭流,以便正确释放连接而不是挂在后台。一种更通用的方法是实现一个 BodySubscriber
,当上游未在超时时间内完成时,它会异常完成自身。这样就不必使用 sendAsync
或关闭流。这是一个合适的实现。
class TimeoutBodySubscriber<T> implements BodySubscriber<T> {
private final BodySubscriber<T> downstream;
private final Duration timeout;
private Subscription subscription;
/** Make sure downstream isn't called after we receive an onComplete or onError. */
private boolean done;
TimeoutBodySubscriber(BodySubscriber<T> downstream, Duration timeout) {
this.downstream = downstream;
this.timeout = timeout;
}
@Override
public CompletionStage<T> getBody() {
return downstream.getBody();
}
@Override
public synchronized void onSubscribe(Subscription subscription) {
this.subscription = requireNonNull(subscription);
downstream.onSubscribe(subscription);
// Schedule an error completion to be fired when timeout evaluates
CompletableFuture.delayedExecutor(timeout.toMillis(), TimeUnit.MILLISECONDS)
.execute(this::onTimeout);
}
private synchronized void onTimeout() {
if (!done) {
done = true;
downstream.onError(new HttpTimeoutException("body completion timed out"));
// Cancel subscription to release the connection, so it doesn't keep hanging in background
subscription.cancel();
}
}
@Override
public synchronized void onNext(List<ByteBuffer> item) {
if (!done) {
downstream.onNext(item);
}
}
@Override
public synchronized void onError(Throwable throwable) {
if (!done) {
done = true;
downstream.onError(throwable);
}
}
@Override
public synchronized void onComplete() {
if (!done) {
done = true;
downstream.onComplete();
}
}
static <T> BodyHandler<T> withBodyTimeout(BodyHandler<T> handler, Duration timeout) {
return responseInfo -> new TimeoutBodySubscriber<>(handler.apply(responseInfo), timeout);
}
}
可以这样使用:
Duration timeout = Duration.ofSeconds(10);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, TimeoutBodySubscriber.withTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
另一种方法是使用读取超时。这更加灵活,因为只要服务器保持活动状态(即继续发送内容),响应就不会超时。如果 BodySubscriber
在超时内没有收到下一个请求的信号,您将需要一个异常完成的自身。这实现起来稍微复杂一些。如果您不介意依赖关系,则可以使用 Methanol。它按照描述实现读取超时。
Duration timeout = Duration.ofSeconds(3);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, MoreBodyHandlers.withReadTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
另一种策略是结合使用两者:一旦服务器变得不活动或正文完成时间太长就超时。
当我需要以流方式处理响应时,我应该如何处理挂起使用 Java 11 中包含的 HTTP 客户端发送 HTTP 响应 body 的服务器?
阅读文档后,我知道可以设置 timeout on connection and a timeout on the request:
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(2))
.build();
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create("http://example.com"))
.timeout(Duration.ofSeconds(5))
.build();
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
responseLineStream.count();
在上面的代码中:
- 如果2秒内无法建立连接,将抛出超时异常。
- 如果5秒内没有收到响应,将抛出超时异常。经实验,连接建立后开始计时,对于这种
BodyHandler
,收到状态行和headers就认为收到了响应。
这意味着当代码执行时,在 7 秒内将抛出异常,或者我们将到达最后一行。但是,最后一行不受任何超时限制。如果服务器停止发送响应 body,最后一行将永远阻塞。
在这种情况下如何防止最后一行挂起?
我猜这留给了流的消费者,因为这是处理逻辑的一部分,所以主体处理仍然可以用 CompletableFuture
:
HttpResponse<Stream<String>> httpResponse = httpClient.send(httpRequest,
HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> responseLineStream.count());
long count = future.get(3, TimeUnit.SECONDS);
或者只是 Future
由 Java Executor
执行。
解决这个问题的一种方法是对接收整个正文所花费的时间设置超时。这就是 M A 的解决方案所做的。正如您所注意到的,如果超时评估,您应该关闭流,以便正确释放连接而不是挂在后台。一种更通用的方法是实现一个 BodySubscriber
,当上游未在超时时间内完成时,它会异常完成自身。这样就不必使用 sendAsync
或关闭流。这是一个合适的实现。
class TimeoutBodySubscriber<T> implements BodySubscriber<T> {
private final BodySubscriber<T> downstream;
private final Duration timeout;
private Subscription subscription;
/** Make sure downstream isn't called after we receive an onComplete or onError. */
private boolean done;
TimeoutBodySubscriber(BodySubscriber<T> downstream, Duration timeout) {
this.downstream = downstream;
this.timeout = timeout;
}
@Override
public CompletionStage<T> getBody() {
return downstream.getBody();
}
@Override
public synchronized void onSubscribe(Subscription subscription) {
this.subscription = requireNonNull(subscription);
downstream.onSubscribe(subscription);
// Schedule an error completion to be fired when timeout evaluates
CompletableFuture.delayedExecutor(timeout.toMillis(), TimeUnit.MILLISECONDS)
.execute(this::onTimeout);
}
private synchronized void onTimeout() {
if (!done) {
done = true;
downstream.onError(new HttpTimeoutException("body completion timed out"));
// Cancel subscription to release the connection, so it doesn't keep hanging in background
subscription.cancel();
}
}
@Override
public synchronized void onNext(List<ByteBuffer> item) {
if (!done) {
downstream.onNext(item);
}
}
@Override
public synchronized void onError(Throwable throwable) {
if (!done) {
done = true;
downstream.onError(throwable);
}
}
@Override
public synchronized void onComplete() {
if (!done) {
done = true;
downstream.onComplete();
}
}
static <T> BodyHandler<T> withBodyTimeout(BodyHandler<T> handler, Duration timeout) {
return responseInfo -> new TimeoutBodySubscriber<>(handler.apply(responseInfo), timeout);
}
}
可以这样使用:
Duration timeout = Duration.ofSeconds(10);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, TimeoutBodySubscriber.withTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
另一种方法是使用读取超时。这更加灵活,因为只要服务器保持活动状态(即继续发送内容),响应就不会超时。如果 BodySubscriber
在超时内没有收到下一个请求的信号,您将需要一个异常完成的自身。这实现起来稍微复杂一些。如果您不介意依赖关系,则可以使用 Methanol。它按照描述实现读取超时。
Duration timeout = Duration.ofSeconds(3);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, MoreBodyHandlers.withReadTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
另一种策略是结合使用两者:一旦服务器变得不活动或正文完成时间太长就超时。