Java 11 HttpClient 取消http请求
Cancellation of http request in Java 11 HttpClient
我正在尝试通过新 Java 11 HttpClient 取消 http 请求。
这是我的测试代码:
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
public class App {
public static void main(String... args) throws InterruptedException {
HttpClient client = HttpClient.newBuilder().build();
URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();
var bodyHandler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.println("#"));
var future = client.sendAsync(request, bodyHandler);
Thread.sleep(1000);
future.cancel(true);
System.out.println("\r\n----------CANCEL!!!------------");
System.out.println("\r\nisCancelled: " + future.isCancelled());
Thread.sleep(250);
}
}
我预计,该请求任务将在 future.cancel(true);
行调用后立即取消。因此,控制台中最后打印的行应该是 isCancelled: true
但是,当我 运行 这段代码时,我看到了这样的东西:
####################################################################################################
----------CANCEL!!!------------
####
isCancelled: true
#######################################################################################################################################################
这意味着,请求任务在我取消后仍然 运行ning...
那么,这是取消请求的正确方法吗?
UPD
取消请求的正确方法是(正如 daniel 所建议的那样,+ UPD2:在 cancel()
方法调用时避免 NPE):
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.ResponseInfo;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Subscription;
public class App {
private static class SubscriberWrapper implements BodySubscriber<Void> {
private final CountDownLatch latch;
private final BodySubscriber<Void> subscriber;
private Subscription subscription;
private SubscriberWrapper(BodySubscriber<Void> subscriber, CountDownLatch latch) {
this.subscriber = subscriber;
this.latch = latch;
}
@Override
public CompletionStage<Void> getBody() {
return subscriber.getBody();
}
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
this.subscription = subscription;
latch.countDown();
}
@Override
public void onNext(List<ByteBuffer> item) {
subscriber.onNext(item);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
public void cancel() {
subscription.cancel();
System.out.println("\r\n----------CANCEL!!!------------");
}
}
private static class BodyHandlerWrapper implements BodyHandler<Void> {
private final CountDownLatch latch = new CountDownLatch(1);
private final BodyHandler<Void> handler;
private SubscriberWrapper subscriberWrapper;
private BodyHandlerWrapper(BodyHandler<Void> handler) {
this.handler = handler;
}
@Override
public BodySubscriber<Void> apply(ResponseInfo responseInfo) {
subscriberWrapper = new SubscriberWrapper(handler.apply(responseInfo), latch);
return subscriberWrapper;
}
public void cancel() {
CompletableFuture.runAsync(() -> {
try {
latch.await();
subscriberWrapper.cancel();
} catch (InterruptedException e) {}
});
}
}
public static void main(String... args) throws InterruptedException, ExecutionException {
HttpClient client = HttpClient.newBuilder().build();
URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();
var handler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.print("#"));
BodyHandlerWrapper handlerWrapper = new BodyHandlerWrapper(handler);
client.sendAsync(request, handlerWrapper).thenAccept(b -> System.out.println(b.statusCode()));
Thread.sleep(1000);
handlerWrapper.cancel();
System.out.println("\r\n------Invoke cancel...---------");
Thread.sleep(2500);
}
}
同步VS异步
可以同步或异步发送请求。同步 API 阻塞,直到 Http 响应可用
HttpResponse<String> response =
client.send(request, BodyHandlers.ofString());
System.out.println(response.statusCode());
System.out.println(response.body());
异步 API returns 立即与 CompletableFuture 一起完成,当 HttpResponse 可用时。 CompletableFuture 是在 Java 8 中添加的,支持可组合的异步编程。
client.sendAsync(request, BodyHandlers.ofString())
.thenApply(response -> { System.out.println(response.statusCode());
return response; } )
.thenApply(HttpResponse::body)
.thenAccept(System.out::println);
未来对象
A Future represents the result of an asynchronous computation. Java Doc
意味着它不是同步函数,您的假设 "I expect, that request task will be cancelled right after" 仅适用于同步方法。
检查 Future 对象的取消
如果您想检查您的任务是否被取消,有一个有用的 isCancelled()
方法。
if(future.isCancelled()) {
// Future object is cancelled, do smth
} else {
// Future object is still running, do smth
}
sendAsync() returns CompletableFuture 对象
方法sendAsync()
returns一个CompletableFuture。请注意,CompletableFuture
实现了 Future
.
的接口
您可以这样做:
client.sendAsync(request, BodyHandlers.ofString())
.thenAccept(response -> {
// do action when completed;
});
在技术术语中,thenAccept
方法添加了一个 Consumer
以在响应可用时调用。
为什么 CompeletableFuture 上的取消方法不起作用
由于(与 FutureTask
不同)此 class 无法直接控制导致其完成的计算,因此取消仅被视为异常完成的另一种形式。方法 cancel 与 completeExceptionally(new CancellationException())
具有相同的效果。方法 isCompletedExceptionally()
可用于确定 CompletableFuture
是否以异常方式完成。
如果使用 CompletionException
异常完成,方法 get()
和 get(long, TimeUnit)
会抛出一个 ExecutionException
,其原因与相应的 CompletionException
中的原因相同].为了简化大多数情况下的使用,此 class 还定义了方法 join()
和 getNow(T),它们在这些情况下直接抛出 CompletionException
。
换句话说
cancel()
方法不使用中断来取消,这就是它不起作用的原因。你应该使用 completeExceptionally(new CancellationException())
参考
您可以使用 java.net.http.HttpClient
API 通过取消传递给响应的 BodySubscriber
的 Flow.Subscription
对象来取消 HTTP 请求。
简单地包装所提供的 BodyHandler
/BodySubscriber
实现之一以获取订阅对象应该相对容易。不幸的是,客户端返回的 CompletableFuture
的 cancel
方法与传递给 BodySubscriber
的 Flow.Subscription
的 cancel
方法之间没有任何关系。取消请求的正确方法是通过订阅的cancel
方法。
取消订阅将适用于同步 (HttpClient::send
) 和异步 (HttpClient::sendAsync
) 方法。
但是,它会产生不同的效果,具体取决于请求是通过 HTTP/1.1 还是 HTTP/2.0 发送的(使用 HTTP/1.1 将导致连接关闭,使用 HTTP/2.0 它将导致流被重置)。当然,如果响应的最后一个字节已经传送到 BodySubscriber
.
,它可能根本没有任何影响
更新: 自 Java 16 以来,可以通过中断调用 HttpClient::send or by invoking cancel(true)
on the CompletableFuture
returned by HttpClient::sendAsync. This has been implemented by JDK-8245462
的线程来取消请求
至少对于同步请求,您可以中断正在调用的线程 httpClient.send(..)
然后 http 客户端尽快中止请求并抛出 InterruptedException
本身。
我正在尝试通过新 Java 11 HttpClient 取消 http 请求。
这是我的测试代码:
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
public class App {
public static void main(String... args) throws InterruptedException {
HttpClient client = HttpClient.newBuilder().build();
URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();
var bodyHandler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.println("#"));
var future = client.sendAsync(request, bodyHandler);
Thread.sleep(1000);
future.cancel(true);
System.out.println("\r\n----------CANCEL!!!------------");
System.out.println("\r\nisCancelled: " + future.isCancelled());
Thread.sleep(250);
}
}
我预计,该请求任务将在 future.cancel(true);
行调用后立即取消。因此,控制台中最后打印的行应该是 isCancelled: true
但是,当我 运行 这段代码时,我看到了这样的东西:
#################################################################################################### ----------CANCEL!!!------------ #### isCancelled: true #######################################################################################################################################################
这意味着,请求任务在我取消后仍然 运行ning... 那么,这是取消请求的正确方法吗?
UPD
取消请求的正确方法是(正如 daniel 所建议的那样,+ UPD2:在 cancel()
方法调用时避免 NPE):
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.ResponseInfo;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Subscription;
public class App {
private static class SubscriberWrapper implements BodySubscriber<Void> {
private final CountDownLatch latch;
private final BodySubscriber<Void> subscriber;
private Subscription subscription;
private SubscriberWrapper(BodySubscriber<Void> subscriber, CountDownLatch latch) {
this.subscriber = subscriber;
this.latch = latch;
}
@Override
public CompletionStage<Void> getBody() {
return subscriber.getBody();
}
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
this.subscription = subscription;
latch.countDown();
}
@Override
public void onNext(List<ByteBuffer> item) {
subscriber.onNext(item);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
public void cancel() {
subscription.cancel();
System.out.println("\r\n----------CANCEL!!!------------");
}
}
private static class BodyHandlerWrapper implements BodyHandler<Void> {
private final CountDownLatch latch = new CountDownLatch(1);
private final BodyHandler<Void> handler;
private SubscriberWrapper subscriberWrapper;
private BodyHandlerWrapper(BodyHandler<Void> handler) {
this.handler = handler;
}
@Override
public BodySubscriber<Void> apply(ResponseInfo responseInfo) {
subscriberWrapper = new SubscriberWrapper(handler.apply(responseInfo), latch);
return subscriberWrapper;
}
public void cancel() {
CompletableFuture.runAsync(() -> {
try {
latch.await();
subscriberWrapper.cancel();
} catch (InterruptedException e) {}
});
}
}
public static void main(String... args) throws InterruptedException, ExecutionException {
HttpClient client = HttpClient.newBuilder().build();
URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();
var handler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.print("#"));
BodyHandlerWrapper handlerWrapper = new BodyHandlerWrapper(handler);
client.sendAsync(request, handlerWrapper).thenAccept(b -> System.out.println(b.statusCode()));
Thread.sleep(1000);
handlerWrapper.cancel();
System.out.println("\r\n------Invoke cancel...---------");
Thread.sleep(2500);
}
}
同步VS异步
可以同步或异步发送请求。同步 API 阻塞,直到 Http 响应可用
HttpResponse<String> response =
client.send(request, BodyHandlers.ofString());
System.out.println(response.statusCode());
System.out.println(response.body());
异步 API returns 立即与 CompletableFuture 一起完成,当 HttpResponse 可用时。 CompletableFuture 是在 Java 8 中添加的,支持可组合的异步编程。
client.sendAsync(request, BodyHandlers.ofString())
.thenApply(response -> { System.out.println(response.statusCode());
return response; } )
.thenApply(HttpResponse::body)
.thenAccept(System.out::println);
未来对象
A Future represents the result of an asynchronous computation. Java Doc
意味着它不是同步函数,您的假设 "I expect, that request task will be cancelled right after" 仅适用于同步方法。
检查 Future 对象的取消
如果您想检查您的任务是否被取消,有一个有用的 isCancelled()
方法。
if(future.isCancelled()) {
// Future object is cancelled, do smth
} else {
// Future object is still running, do smth
}
sendAsync() returns CompletableFuture 对象
方法sendAsync()
returns一个CompletableFuture。请注意,CompletableFuture
实现了 Future
.
您可以这样做:
client.sendAsync(request, BodyHandlers.ofString())
.thenAccept(response -> {
// do action when completed;
});
在技术术语中,thenAccept
方法添加了一个 Consumer
以在响应可用时调用。
为什么 CompeletableFuture 上的取消方法不起作用
由于(与 FutureTask
不同)此 class 无法直接控制导致其完成的计算,因此取消仅被视为异常完成的另一种形式。方法 cancel 与 completeExceptionally(new CancellationException())
具有相同的效果。方法 isCompletedExceptionally()
可用于确定 CompletableFuture
是否以异常方式完成。
如果使用 CompletionException
异常完成,方法 get()
和 get(long, TimeUnit)
会抛出一个 ExecutionException
,其原因与相应的 CompletionException
中的原因相同].为了简化大多数情况下的使用,此 class 还定义了方法 join()
和 getNow(T),它们在这些情况下直接抛出 CompletionException
。
换句话说
cancel()
方法不使用中断来取消,这就是它不起作用的原因。你应该使用 completeExceptionally(new CancellationException())
参考
您可以使用 java.net.http.HttpClient
API 通过取消传递给响应的 BodySubscriber
的 Flow.Subscription
对象来取消 HTTP 请求。
简单地包装所提供的 BodyHandler
/BodySubscriber
实现之一以获取订阅对象应该相对容易。不幸的是,客户端返回的 CompletableFuture
的 cancel
方法与传递给 BodySubscriber
的 Flow.Subscription
的 cancel
方法之间没有任何关系。取消请求的正确方法是通过订阅的cancel
方法。
取消订阅将适用于同步 (HttpClient::send
) 和异步 (HttpClient::sendAsync
) 方法。
但是,它会产生不同的效果,具体取决于请求是通过 HTTP/1.1 还是 HTTP/2.0 发送的(使用 HTTP/1.1 将导致连接关闭,使用 HTTP/2.0 它将导致流被重置)。当然,如果响应的最后一个字节已经传送到 BodySubscriber
.
更新: 自 Java 16 以来,可以通过中断调用 HttpClient::send or by invoking cancel(true)
on the CompletableFuture
returned by HttpClient::sendAsync. This has been implemented by JDK-8245462
至少对于同步请求,您可以中断正在调用的线程 httpClient.send(..)
然后 http 客户端尽快中止请求并抛出 InterruptedException
本身。