如何在我的线程中断后立即中断 RestTemplate 调用?

How can I interrupt RestTemplate call as soon as my thread is interrupted?

我需要制作一个具有同步和异步功能的库。

我的库的核心逻辑

客户将使用我们的库,他们将通过传递 DataKey 构建器对象来调用它。然后,我们将通过使用该 DataKey 对象构造一个 URL 并通过执行它对 URL 进行 HTTP 客户端调用,在我们将响应作为 JSON 字符串返回后,我们将通过创建 DataResponse 对象将 JSON 字符串原样发送回我们的客户。有些客户会调用 executeSynchronous(),有些可能会调用 executeAsynchronous(),所以这就是为什么我需要在我的库中分别提供两种方法。

接口:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

然后我的 DataClient 实现了上面的 Client 接口:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            // does this looks right?
            future.cancel(true); // terminating tasks that have timed out
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

简单 class 将执行实际任务:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

当我开始研究这个解决方案时,我并没有终止已经超时的任务。我是在给客户端报超时,但是任务一直在线程池中运行(可能会长期占用我有限的10个线程中的一个)。所以我在网上做了一些研究,我发现我可以通过使用 cancel on future 来取消那些已经超时的任务,如下所示 -

future.cancel(true);

但是如果我像上面的解决方案那样这样做,那么我是否需要在线程中断时立即关闭任何其他资源,如 RestTemplate?如果是,那我该怎么做?另外,我们可以中断 RestTemplate 通话吗?因为我尝试在任务超时后立即调用取消,但我想我的线程没有被打断。

我们是否应该始终终止已超时的任务?如果我们不这样做,那么我会产生什么影响?它会影响我的表现吗?

我目前的设置是否有更好的解决方案来处理这种情况?

appears 无法中断或取消对 RestTemplate 的调用。即使 "kludge" 使用回调,RestTemplate 也可能在内部锁定资源,在调用回调之前等待响应。

当底层套接字可访问时,网络 I/O 可以通过从另一个线程关闭套接字来中止。例如,可以启动计时器以在超时结束后关闭套接字。或者,如果您想要一个对中断敏感的无限期超时(例如,由于用户按下 "Cancel" 按钮),您可以提交一个无限期等待但通过关闭套接字响应中断的任务。

不幸的是,RestTemplate 的作者似乎没有提供此功能。

是的,您应该清理由于任务取消或过期而不再需要的资源。是的,它会影响性能。如果您的线程池的线程数量有限,最终所有线程都会卡在已失效的任务中。如果它有无限数量的线程,最终内存会被耗尽。

有时无法中断线程,尤其是当线程对Socket 执行阻塞操作时。

因此,与其在超时时取消任务,不如在 http 连接上设置超时。

不幸的是,每个连接工厂和 RestTemplate 都设置了超时,因此每个请求都必须使用它自己的 RestTemplate。

您可以为每个任务创建新的 RestTemplate,或者使用 ThreadLocal 或资源池重用以前创建的模板。

例如,使用本地线程的任务可能如下所示:

    public class Task implements Callable<DataResponse> {

    private DataKey key;

    private ThreadLocal<RestTemplate> restTemplateThreadLocal =
            ThreadLocal.withInitial(()->new RestTemplate(new SimpleClientHttpRequestFactory()));

    public Task(DataKey key) {
        this.key = key;
    }

    private SimpleClientHttpRequestFactory getConnectionFactory(){
        return (SimpleClientHttpRequestFactory)restTemplateThreadLocal.get().getRequestFactory();
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            //it is up to you, how to set connection and read timeouts from provided key.getTimeout
            getConnectionFactory().setConnectTimeout(1000);
            getConnectionFactory().setReadTimeout(key.getTimeout());
            response = restTemplateThreadLocal.get().getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
   }

顺便说一句。 Spring还提供了AsyncRestTemplate,可以让你的代码更简单。 如果与 Netty4ClientHttpRequestFactory 一起使用,您可以获得基于 NIO 的客户端连接。在这种情况下,即使它建立了 Http 连接,您也应该能够中断您的任务。

下面是简短示例。它使用 NIO,因此您不必关心请求是否真的在超时后被取消。

        URI url = new URI("http://www.chicagotribune.com/news/ct-college-of-dupage-investigation-met-20150330-story.html");
        Netty4ClientHttpRequestFactory asyncRequestFactory = new Netty4ClientHttpRequestFactory();
        AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate(asyncRequestFactory);
        ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
        System.out.println("entity.get() = " + entity.get());
        asyncRequestFactory.destroy();