Java 11 HttpClient 取消http请求

Cancellation of http request in Java 11 HttpClient

我正在尝试通过新 Java 11 HttpClient 取消 http 请求。

这是我的测试代码:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

public class App {

    public static void main(String... args) throws InterruptedException {
        HttpClient client = HttpClient.newBuilder().build();

        URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
        HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();

        var bodyHandler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.println("#"));
        var future = client.sendAsync(request, bodyHandler);
        Thread.sleep(1000);

        future.cancel(true);
        System.out.println("\r\n----------CANCEL!!!------------");
        System.out.println("\r\nisCancelled: " + future.isCancelled());
        Thread.sleep(250);
    }
}

我预计,该请求任务将在 future.cancel(true); 行调用后立即取消。因此,控制台中最后打印的行应该是 isCancelled: true

但是,当我 运行 这段代码时,我看到了这样的东西:

####################################################################################################
----------CANCEL!!!------------
####
isCancelled: true
#######################################################################################################################################################

这意味着,请求任务在我取消后仍然 运行ning... 那么,这是取消请求的正确方法吗?

UPD

取消请求的正确方法是(正如 daniel 所建议的那样,+ UPD2:在 cancel() 方法调用时避免 NPE):

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.ResponseInfo;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Subscription;

public class App {

    private static class SubscriberWrapper implements BodySubscriber<Void> {
        private final CountDownLatch latch;
        private final BodySubscriber<Void> subscriber;
        private Subscription subscription;

        private SubscriberWrapper(BodySubscriber<Void> subscriber, CountDownLatch latch) {
            this.subscriber = subscriber;
            this.latch = latch;
        }

        @Override
        public CompletionStage<Void> getBody() {
            return subscriber.getBody();
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            subscriber.onSubscribe(subscription);
            this.subscription = subscription;
            latch.countDown();
        }

        @Override
        public void onNext(List<ByteBuffer> item) {
            subscriber.onNext(item);
        }

        @Override
        public void onError(Throwable throwable) {
            subscriber.onError(throwable);
        }

        @Override
        public void onComplete() {
            subscriber.onComplete();
        }

        public void cancel() {
            subscription.cancel();
            System.out.println("\r\n----------CANCEL!!!------------");
        }
    }

    private static class BodyHandlerWrapper implements BodyHandler<Void> {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final BodyHandler<Void> handler;
        private SubscriberWrapper subscriberWrapper;

        private BodyHandlerWrapper(BodyHandler<Void> handler) {
            this.handler = handler;
        }

        @Override
        public BodySubscriber<Void> apply(ResponseInfo responseInfo) {
            subscriberWrapper = new SubscriberWrapper(handler.apply(responseInfo), latch);
            return subscriberWrapper;
        }

        public void cancel() {
            CompletableFuture.runAsync(() -> {
                try {
                    latch.await();
                    subscriberWrapper.cancel();
                } catch (InterruptedException e) {}
            });
        }
    }

    public static void main(String... args) throws InterruptedException, ExecutionException {
        HttpClient client = HttpClient.newBuilder().build();

        URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
        HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();

        var handler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.print("#"));
        BodyHandlerWrapper handlerWrapper = new BodyHandlerWrapper(handler);

        client.sendAsync(request, handlerWrapper).thenAccept(b -> System.out.println(b.statusCode()));
        Thread.sleep(1000);
        handlerWrapper.cancel();

        System.out.println("\r\n------Invoke cancel...---------");
        Thread.sleep(2500);
    }
}

同步VS异步

可以同步或异步发送请求。同步 API 阻塞,直到 Http 响应可用

HttpResponse<String> response =
      client.send(request, BodyHandlers.ofString());
System.out.println(response.statusCode());
System.out.println(response.body());

异步 API returns 立即与 CompletableFuture 一起完成,当 HttpResponse 可用时。 CompletableFuture 是在 Java 8 中添加的,支持可组合的异步编程。

client.sendAsync(request, BodyHandlers.ofString())
      .thenApply(response -> { System.out.println(response.statusCode());
                               return response; } )
      .thenApply(HttpResponse::body)
      .thenAccept(System.out::println);

未来对象

A Future represents the result of an asynchronous computation. Java Doc

意味着它不是同步函数,您的假设 "I expect, that request task will be cancelled right after" 仅适用于同步方法。

检查 Future 对象的取消

如果您想检查您的任务是否被取消,有一个有用的 isCancelled() 方法。

if(future.isCancelled()) {
  // Future object is cancelled, do smth
} else {
  // Future object is still running, do smth
}

sendAsync() returns CompletableFuture 对象

方法sendAsync()returns一个CompletableFuture。请注意,CompletableFuture 实现了 Future.

的接口

您可以这样做:

client.sendAsync(request, BodyHandlers.ofString())
          .thenAccept(response -> {
       // do action when completed;
});

在技术术语中,thenAccept 方法添加了一个 Consumer 以在响应可用时调用。

为什么 CompeletableFuture 上的取消方法不起作用

由于(与 FutureTask 不同)此 class 无法直接控制导致其完成的计算,因此取消仅被视为异常完成的另一种形式。方法 cancel 与 completeExceptionally(new CancellationException()) 具有相同的效果。方法 isCompletedExceptionally() 可用于确定 CompletableFuture 是否以异常方式完成。

如果使用 CompletionException 异常完成,方法 get()get(long, TimeUnit) 会抛出一个 ExecutionException,其原因与相应的 CompletionException 中的原因相同].为了简化大多数情况下的使用,此 class 还定义了方法 join() 和 getNow(T),它们在这些情况下直接抛出 CompletionException

换句话说

cancel() 方法不使用中断来取消,这就是它不起作用的原因。你应该使用 completeExceptionally(new CancellationException())

参考

您可以使用 java.net.http.HttpClient API 通过取消传递给响应的 BodySubscriberFlow.Subscription 对象来取消 HTTP 请求。 简单地包装所提供的 BodyHandler/BodySubscriber 实现之一以获取订阅对象应该相对容易。不幸的是,客户端返回的 CompletableFuturecancel 方法与传递给 BodySubscriberFlow.Subscriptioncancel 方法之间没有任何关系。取消请求的正确方法是通过订阅的cancel方法。

取消订阅将适用于同步 (HttpClient::send) 和异步 (HttpClient::sendAsync) 方法。 但是,它会产生不同的效果,具体取决于请求是通过 HTTP/1.1 还是 HTTP/2.0 发送的(使用 HTTP/1.1 将导致连接关闭,使用 HTTP/2.0 它将导致流被重置)。当然,如果响应的最后一个字节已经传送到 BodySubscriber.

,它可能根本没有任何影响

更新: 自 Java 16 以来,可以通过中断调用 HttpClient::send or by invoking cancel(true) on the CompletableFuture returned by HttpClient::sendAsync. This has been implemented by JDK-8245462

的线程来取消请求

至少对于同步请求,您可以中断正在调用的线程 httpClient.send(..)

然后 http 客户端尽快中止请求并抛出 InterruptedException 本身。