如何在我的线程中断后立即中断 RestTemplate 调用?
How can I interrupt RestTemplate call as soon as my thread is interrupted?
我需要制作一个具有同步和异步功能的库。
executeSynchronous()
- 等到我有结果,returns 结果。
executeAsynchronous()
- returns 如果需要,可以在其他事情完成后立即处理 Future。
我的库的核心逻辑
客户将使用我们的库,他们将通过传递 DataKey
构建器对象来调用它。然后,我们将通过使用该 DataKey
对象构造一个 URL 并通过执行它对 URL 进行 HTTP 客户端调用,在我们将响应作为 JSON 字符串返回后,我们将通过创建 DataResponse
对象将 JSON 字符串原样发送回我们的客户。有些客户会调用 executeSynchronous()
,有些可能会调用 executeAsynchronous()
,所以这就是为什么我需要在我的库中分别提供两种方法。
接口:
public interface Client {
// for synchronous
public DataResponse executeSynchronous(DataKey key);
// for asynchronous
public Future<DataResponse> executeAsynchronous(DataKey key);
}
然后我的 DataClient
实现了上面的 Client
接口:
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
private ExecutorService executor = Executors.newFixedThreadPool(10);
// for synchronous call
@Override
public DataResponse executeSynchronous(DataKey key) {
DataResponse dataResponse = null;
Future<DataResponse> future = null;
try {
future = executeAsynchronous(key);
dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
// does this looks right?
future.cancel(true); // terminating tasks that have timed out
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
//for asynchronous call
@Override
public Future<DataResponse> executeAsynchronous(DataKey key) {
Future<DataResponse> future = null;
try {
Task task = new Task(key, restTemplate);
future = executor.submit(task);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
}
return future;
}
}
简单 class 将执行实际任务:
public class Task implements Callable<DataResponse> {
private DataKey key;
private RestTemplate restTemplate;
public Task(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
String response = null;
try {
String url = createURL();
response = restTemplate.getForObject(url, String.class);
// it is a successful response
dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
} catch (RestClientException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
// create a URL by using key object
private String createURL() {
String url = somecode;
return url;
}
}
当我开始研究这个解决方案时,我并没有终止已经超时的任务。我是在给客户端报超时,但是任务一直在线程池中运行(可能会长期占用我有限的10个线程中的一个)。所以我在网上做了一些研究,我发现我可以通过使用 cancel on future 来取消那些已经超时的任务,如下所示 -
future.cancel(true);
但是如果我像上面的解决方案那样这样做,那么我是否需要在线程中断时立即关闭任何其他资源,如 RestTemplate
?如果是,那我该怎么做?另外,我们可以中断 RestTemplate
通话吗?因为我尝试在任务超时后立即调用取消,但我想我的线程没有被打断。
我们是否应该始终终止已超时的任务?如果我们不这样做,那么我会产生什么影响?它会影响我的表现吗?
我目前的设置是否有更好的解决方案来处理这种情况?
appears 无法中断或取消对 RestTemplate
的调用。即使 "kludge" 使用回调,RestTemplate
也可能在内部锁定资源,在调用回调之前等待响应。
当底层套接字可访问时,网络 I/O 可以通过从另一个线程关闭套接字来中止。例如,可以启动计时器以在超时结束后关闭套接字。或者,如果您想要一个对中断敏感的无限期超时(例如,由于用户按下 "Cancel" 按钮),您可以提交一个无限期等待但通过关闭套接字响应中断的任务。
不幸的是,RestTemplate
的作者似乎没有提供此功能。
是的,您应该清理由于任务取消或过期而不再需要的资源。是的,它会影响性能。如果您的线程池的线程数量有限,最终所有线程都会卡在已失效的任务中。如果它有无限数量的线程,最终内存会被耗尽。
有时无法中断线程,尤其是当线程对Socket 执行阻塞操作时。
因此,与其在超时时取消任务,不如在 http 连接上设置超时。
不幸的是,每个连接工厂和 RestTemplate 都设置了超时,因此每个请求都必须使用它自己的 RestTemplate。
您可以为每个任务创建新的 RestTemplate,或者使用 ThreadLocal 或资源池重用以前创建的模板。
例如,使用本地线程的任务可能如下所示:
public class Task implements Callable<DataResponse> {
private DataKey key;
private ThreadLocal<RestTemplate> restTemplateThreadLocal =
ThreadLocal.withInitial(()->new RestTemplate(new SimpleClientHttpRequestFactory()));
public Task(DataKey key) {
this.key = key;
}
private SimpleClientHttpRequestFactory getConnectionFactory(){
return (SimpleClientHttpRequestFactory)restTemplateThreadLocal.get().getRequestFactory();
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
String response = null;
try {
String url = createURL();
//it is up to you, how to set connection and read timeouts from provided key.getTimeout
getConnectionFactory().setConnectTimeout(1000);
getConnectionFactory().setReadTimeout(key.getTimeout());
response = restTemplateThreadLocal.get().getForObject(url, String.class);
// it is a successful response
dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
} catch (RestClientException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
// create a URL by using key object
private String createURL() {
String url = somecode;
return url;
}
}
顺便说一句。
Spring还提供了AsyncRestTemplate,可以让你的代码更简单。
如果与 Netty4ClientHttpRequestFactory 一起使用,您可以获得基于 NIO 的客户端连接。在这种情况下,即使它建立了 Http 连接,您也应该能够中断您的任务。
下面是简短示例。它使用 NIO,因此您不必关心请求是否真的在超时后被取消。
URI url = new URI("http://www.chicagotribune.com/news/ct-college-of-dupage-investigation-met-20150330-story.html");
Netty4ClientHttpRequestFactory asyncRequestFactory = new Netty4ClientHttpRequestFactory();
AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate(asyncRequestFactory);
ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
System.out.println("entity.get() = " + entity.get());
asyncRequestFactory.destroy();
我需要制作一个具有同步和异步功能的库。
executeSynchronous()
- 等到我有结果,returns 结果。executeAsynchronous()
- returns 如果需要,可以在其他事情完成后立即处理 Future。
我的库的核心逻辑
客户将使用我们的库,他们将通过传递 DataKey
构建器对象来调用它。然后,我们将通过使用该 DataKey
对象构造一个 URL 并通过执行它对 URL 进行 HTTP 客户端调用,在我们将响应作为 JSON 字符串返回后,我们将通过创建 DataResponse
对象将 JSON 字符串原样发送回我们的客户。有些客户会调用 executeSynchronous()
,有些可能会调用 executeAsynchronous()
,所以这就是为什么我需要在我的库中分别提供两种方法。
接口:
public interface Client {
// for synchronous
public DataResponse executeSynchronous(DataKey key);
// for asynchronous
public Future<DataResponse> executeAsynchronous(DataKey key);
}
然后我的 DataClient
实现了上面的 Client
接口:
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
private ExecutorService executor = Executors.newFixedThreadPool(10);
// for synchronous call
@Override
public DataResponse executeSynchronous(DataKey key) {
DataResponse dataResponse = null;
Future<DataResponse> future = null;
try {
future = executeAsynchronous(key);
dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
// does this looks right?
future.cancel(true); // terminating tasks that have timed out
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
//for asynchronous call
@Override
public Future<DataResponse> executeAsynchronous(DataKey key) {
Future<DataResponse> future = null;
try {
Task task = new Task(key, restTemplate);
future = executor.submit(task);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
}
return future;
}
}
简单 class 将执行实际任务:
public class Task implements Callable<DataResponse> {
private DataKey key;
private RestTemplate restTemplate;
public Task(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
String response = null;
try {
String url = createURL();
response = restTemplate.getForObject(url, String.class);
// it is a successful response
dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
} catch (RestClientException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
// create a URL by using key object
private String createURL() {
String url = somecode;
return url;
}
}
当我开始研究这个解决方案时,我并没有终止已经超时的任务。我是在给客户端报超时,但是任务一直在线程池中运行(可能会长期占用我有限的10个线程中的一个)。所以我在网上做了一些研究,我发现我可以通过使用 cancel on future 来取消那些已经超时的任务,如下所示 -
future.cancel(true);
但是如果我像上面的解决方案那样这样做,那么我是否需要在线程中断时立即关闭任何其他资源,如 RestTemplate
?如果是,那我该怎么做?另外,我们可以中断 RestTemplate
通话吗?因为我尝试在任务超时后立即调用取消,但我想我的线程没有被打断。
我们是否应该始终终止已超时的任务?如果我们不这样做,那么我会产生什么影响?它会影响我的表现吗?
我目前的设置是否有更好的解决方案来处理这种情况?
appears 无法中断或取消对 RestTemplate
的调用。即使 "kludge" 使用回调,RestTemplate
也可能在内部锁定资源,在调用回调之前等待响应。
当底层套接字可访问时,网络 I/O 可以通过从另一个线程关闭套接字来中止。例如,可以启动计时器以在超时结束后关闭套接字。或者,如果您想要一个对中断敏感的无限期超时(例如,由于用户按下 "Cancel" 按钮),您可以提交一个无限期等待但通过关闭套接字响应中断的任务。
不幸的是,RestTemplate
的作者似乎没有提供此功能。
是的,您应该清理由于任务取消或过期而不再需要的资源。是的,它会影响性能。如果您的线程池的线程数量有限,最终所有线程都会卡在已失效的任务中。如果它有无限数量的线程,最终内存会被耗尽。
有时无法中断线程,尤其是当线程对Socket 执行阻塞操作时。
因此,与其在超时时取消任务,不如在 http 连接上设置超时。
不幸的是,每个连接工厂和 RestTemplate 都设置了超时,因此每个请求都必须使用它自己的 RestTemplate。
您可以为每个任务创建新的 RestTemplate,或者使用 ThreadLocal 或资源池重用以前创建的模板。
例如,使用本地线程的任务可能如下所示:
public class Task implements Callable<DataResponse> {
private DataKey key;
private ThreadLocal<RestTemplate> restTemplateThreadLocal =
ThreadLocal.withInitial(()->new RestTemplate(new SimpleClientHttpRequestFactory()));
public Task(DataKey key) {
this.key = key;
}
private SimpleClientHttpRequestFactory getConnectionFactory(){
return (SimpleClientHttpRequestFactory)restTemplateThreadLocal.get().getRequestFactory();
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
String response = null;
try {
String url = createURL();
//it is up to you, how to set connection and read timeouts from provided key.getTimeout
getConnectionFactory().setConnectTimeout(1000);
getConnectionFactory().setReadTimeout(key.getTimeout());
response = restTemplateThreadLocal.get().getForObject(url, String.class);
// it is a successful response
dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
} catch (RestClientException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
// create a URL by using key object
private String createURL() {
String url = somecode;
return url;
}
}
顺便说一句。 Spring还提供了AsyncRestTemplate,可以让你的代码更简单。 如果与 Netty4ClientHttpRequestFactory 一起使用,您可以获得基于 NIO 的客户端连接。在这种情况下,即使它建立了 Http 连接,您也应该能够中断您的任务。
下面是简短示例。它使用 NIO,因此您不必关心请求是否真的在超时后被取消。
URI url = new URI("http://www.chicagotribune.com/news/ct-college-of-dupage-investigation-met-20150330-story.html");
Netty4ClientHttpRequestFactory asyncRequestFactory = new Netty4ClientHttpRequestFactory();
AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate(asyncRequestFactory);
ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
System.out.println("entity.get() = " + entity.get());
asyncRequestFactory.destroy();