如何终止多线程中超时的任务?
How can I terminate Tasks that have timed out in multithreading?
我需要创建一个库,其中包含同步和异步方法。
executeSynchronous()
- 等到我有结果,returns 结果。
executeAsynchronous()
- returns 如果需要,可以在其他事情完成后立即处理 Future。
我的库的核心逻辑
客户将使用我们的库,他们将通过传递 DataKey
构建器对象来调用它。然后,我们将通过使用该 DataKey
对象构造一个 URL 并通过执行它对该 URL 进行 HTTP 客户端调用,在我们将响应作为 JSON 字符串返回后,我们将通过创建 DataResponse
对象将 JSON 字符串原样发送回我们的客户。有些客户会调用 executeSynchronous()
,有些可能会调用 executeAsynchronous()
,所以这就是为什么我需要在我的库中分别提供两种方法。
接口:
public interface Client {
// for synchronous
public DataResponse executeSynchronous(DataKey key);
// for asynchronous
public Future<DataResponse> executeAsynchronous(DataKey key);
}
然后我的 DataClient
实现了上面的 Client
接口:
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
private ExecutorService executor = Executors.newFixedThreadPool(10);
// for synchronous call
@Override
public DataResponse executeSynchronous(DataKey key) {
DataResponse dataResponse = null;
Future<DataResponse> future = null;
try {
future = executeAsynchronous(key);
dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
// does this look right the way I am doing it?
future.cancel(true); // terminating tasks that have timed out.
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
//for asynchronous call
@Override
public Future<DataResponse> executeAsynchronous(DataKey key) {
Future<DataResponse> future = null;
try {
Task task = new Task(key, restTemplate);
future = executor.submit(task);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
}
return future;
}
}
简单 class 将执行实际任务:
public class Task implements Callable<DataResponse> {
private DataKey key;
private RestTemplate restTemplate;
public Task(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
String response = null;
try {
String url = createURL();
response = restTemplate.getForObject(url, String.class);
// it is a successful response
dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
} catch (RestClientException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
// create a URL by using key object
private String createURL() {
String url = somecode;
return url;
}
}
问题陈述:-
当我开始研究这个解决方案时,我并没有终止已经超时的任务。我是在给客户端报超时,但是任务一直在线程池中运行(可能会长期占用我有限的10个线程中的一个)。所以我在网上做了一些研究,我发现我可以通过在 future
上使用 cancel
来取消那些已经超时的任务,如下所示 -
future.cancel(true);
但我想确定一下,我在 executeSynchronous
方法中取消超时任务的方式是否正确?
因为我在 Future
上调用 cancel()
如果任务仍在队列中,它将停止 运行ning,所以我不确定我在做什么是对的或不?执行此操作的正确方法是什么?
如果有更好的方法,谁能举个例子?
如果任务仍在队列中,则只需调用 future.cancel()
即可取消它,但显然您不知道它是否在队列中。此外,即使您要求 future
中断任务,它也可能无法正常工作,因为您的任务仍然可以做一些忽略线程中断状态的事情。
所以你可以使用future.cancel(true)
但是你需要确保你的任务(线程)确实考虑到线程中断状态。例如,正如您提到的,您进行了 http 调用,因此您可能需要在线程中断后立即关闭 http 客户端资源。
请参考下面的例子。
我尝试实现了任务取消场景。通常一个线程可以检查 isInterrupted()
并尝试终止自己。但是当你使用线程池执行器时,这会变得更加复杂,可调用并且任务不是真的像 while(!Thread.isInterrupted()) {// execute task}
。
在这个例子中,一个任务是写一个文件(为了简单起见,我没有使用 http 调用)。线程池执行程序启动 运行 任务,但调用者想在 100 毫秒后立即取消它。现在 future 向线程发送中断信号,但可调用任务在写入文件时无法立即检查它。因此,为了实现这一点,可调用维护了一个它将要使用的 IO 资源列表,一旦将来想要取消任务,它就在所有 IO 资源上调用 cancel()
,这会以 IOException 终止任务,然后线程完成。
public class CancellableTaskTest {
public static void main(String[] args) throws Exception {
CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
long startTime = System.currentTimeMillis();
Future<String> future = threadPoolExecutor.submit(new CancellableTask());
while (System.currentTimeMillis() - startTime < 100) {
Thread.sleep(10);
}
System.out.println("Trying to cancel task");
future.cancel(true);
}
}
class CancellableThreadPoolExecutor extends ThreadPoolExecutor {
public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new CancellableFutureTask<T>(callable);
}
}
class CancellableFutureTask<V> extends FutureTask<V> {
private WeakReference<CancellableTask> weakReference;
public CancellableFutureTask(Callable<V> callable) {
super(callable);
if (callable instanceof CancellableTask) {
this.weakReference = new WeakReference<CancellableTask>((CancellableTask) callable);
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = super.cancel(mayInterruptIfRunning);
if (weakReference != null) {
CancellableTask task = weakReference.get();
if (task != null) {
try {
task.cancel();
} catch (Exception e) {
e.printStackTrace();
result = false;
}
}
}
return result;
}
}
class CancellableTask implements Callable<String> {
private volatile boolean cancelled;
private final Object lock = new Object();
private LinkedList<Object> cancellableResources = new LinkedList<Object>();
@Override
public String call() throws Exception {
if (!cancelled) {
System.out.println("Task started");
// write file
File file = File.createTempFile("testfile", ".txt");
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
synchronized (lock) {
cancellableResources.add(writer);
}
try {
long lineCount = 0;
while (lineCount++ < 100000000) {
writer.write("This is a test text at line: " + lineCount);
writer.newLine();
}
System.out.println("Task completed");
} catch (Exception e) {
e.printStackTrace();
} finally {
writer.close();
file.delete();
synchronized (lock) {
cancellableResources.clear();
}
}
}
return "done";
}
public void cancel() throws Exception {
cancelled = true;
Thread.sleep(1000);
boolean success = false;
synchronized (lock) {
for (Object cancellableResource : cancellableResources) {
if (cancellableResource instanceof Closeable) {
((Closeable) cancellableResource).close();
success = true;
}
}
}
System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all"));
}
}
对于您的 REST Http 客户端相关要求,您可以修改工厂 class 类似这样的内容 -
public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory {
private List<Object> cancellableResources;
public CancellableSimpleClientHttpRequestFactory() {
}
public CancellableSimpleClientHttpRequestFactory(List<Object> cancellableResources) {
this.cancellableResources = cancellableResources;
}
protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException {
HttpURLConnection connection = super.openConnection(url, proxy);
if (cancellableResources != null) {
cancellableResources.add(connection);
}
return connection;
}
}
在这里,您需要在可运行任务中创建 RestTemplate
时使用此工厂。
RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources));
确保您传递的可取消资源列表与您在 CancellableTask
中维护的列表相同。
现在你需要像这样修改CancellableTask
中的cancel()
方法-
synchronized (lock) {
for (Object cancellableResource : cancellableResources) {
if (cancellableResource instanceof HttpURLConnection) {
((HttpURLConnection) cancellableResource).disconnect();
success = true;
}
}
}
我需要创建一个库,其中包含同步和异步方法。
executeSynchronous()
- 等到我有结果,returns 结果。executeAsynchronous()
- returns 如果需要,可以在其他事情完成后立即处理 Future。
我的库的核心逻辑
客户将使用我们的库,他们将通过传递 DataKey
构建器对象来调用它。然后,我们将通过使用该 DataKey
对象构造一个 URL 并通过执行它对该 URL 进行 HTTP 客户端调用,在我们将响应作为 JSON 字符串返回后,我们将通过创建 DataResponse
对象将 JSON 字符串原样发送回我们的客户。有些客户会调用 executeSynchronous()
,有些可能会调用 executeAsynchronous()
,所以这就是为什么我需要在我的库中分别提供两种方法。
接口:
public interface Client {
// for synchronous
public DataResponse executeSynchronous(DataKey key);
// for asynchronous
public Future<DataResponse> executeAsynchronous(DataKey key);
}
然后我的 DataClient
实现了上面的 Client
接口:
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
private ExecutorService executor = Executors.newFixedThreadPool(10);
// for synchronous call
@Override
public DataResponse executeSynchronous(DataKey key) {
DataResponse dataResponse = null;
Future<DataResponse> future = null;
try {
future = executeAsynchronous(key);
dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
// does this look right the way I am doing it?
future.cancel(true); // terminating tasks that have timed out.
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
//for asynchronous call
@Override
public Future<DataResponse> executeAsynchronous(DataKey key) {
Future<DataResponse> future = null;
try {
Task task = new Task(key, restTemplate);
future = executor.submit(task);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
}
return future;
}
}
简单 class 将执行实际任务:
public class Task implements Callable<DataResponse> {
private DataKey key;
private RestTemplate restTemplate;
public Task(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
String response = null;
try {
String url = createURL();
response = restTemplate.getForObject(url, String.class);
// it is a successful response
dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
} catch (RestClientException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
// create a URL by using key object
private String createURL() {
String url = somecode;
return url;
}
}
问题陈述:-
当我开始研究这个解决方案时,我并没有终止已经超时的任务。我是在给客户端报超时,但是任务一直在线程池中运行(可能会长期占用我有限的10个线程中的一个)。所以我在网上做了一些研究,我发现我可以通过在 future
上使用 cancel
来取消那些已经超时的任务,如下所示 -
future.cancel(true);
但我想确定一下,我在 executeSynchronous
方法中取消超时任务的方式是否正确?
因为我在 Future
上调用 cancel()
如果任务仍在队列中,它将停止 运行ning,所以我不确定我在做什么是对的或不?执行此操作的正确方法是什么?
如果有更好的方法,谁能举个例子?
如果任务仍在队列中,则只需调用 future.cancel()
即可取消它,但显然您不知道它是否在队列中。此外,即使您要求 future
中断任务,它也可能无法正常工作,因为您的任务仍然可以做一些忽略线程中断状态的事情。
所以你可以使用future.cancel(true)
但是你需要确保你的任务(线程)确实考虑到线程中断状态。例如,正如您提到的,您进行了 http 调用,因此您可能需要在线程中断后立即关闭 http 客户端资源。
请参考下面的例子。
我尝试实现了任务取消场景。通常一个线程可以检查 isInterrupted()
并尝试终止自己。但是当你使用线程池执行器时,这会变得更加复杂,可调用并且任务不是真的像 while(!Thread.isInterrupted()) {// execute task}
。
在这个例子中,一个任务是写一个文件(为了简单起见,我没有使用 http 调用)。线程池执行程序启动 运行 任务,但调用者想在 100 毫秒后立即取消它。现在 future 向线程发送中断信号,但可调用任务在写入文件时无法立即检查它。因此,为了实现这一点,可调用维护了一个它将要使用的 IO 资源列表,一旦将来想要取消任务,它就在所有 IO 资源上调用 cancel()
,这会以 IOException 终止任务,然后线程完成。
public class CancellableTaskTest {
public static void main(String[] args) throws Exception {
CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
long startTime = System.currentTimeMillis();
Future<String> future = threadPoolExecutor.submit(new CancellableTask());
while (System.currentTimeMillis() - startTime < 100) {
Thread.sleep(10);
}
System.out.println("Trying to cancel task");
future.cancel(true);
}
}
class CancellableThreadPoolExecutor extends ThreadPoolExecutor {
public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new CancellableFutureTask<T>(callable);
}
}
class CancellableFutureTask<V> extends FutureTask<V> {
private WeakReference<CancellableTask> weakReference;
public CancellableFutureTask(Callable<V> callable) {
super(callable);
if (callable instanceof CancellableTask) {
this.weakReference = new WeakReference<CancellableTask>((CancellableTask) callable);
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = super.cancel(mayInterruptIfRunning);
if (weakReference != null) {
CancellableTask task = weakReference.get();
if (task != null) {
try {
task.cancel();
} catch (Exception e) {
e.printStackTrace();
result = false;
}
}
}
return result;
}
}
class CancellableTask implements Callable<String> {
private volatile boolean cancelled;
private final Object lock = new Object();
private LinkedList<Object> cancellableResources = new LinkedList<Object>();
@Override
public String call() throws Exception {
if (!cancelled) {
System.out.println("Task started");
// write file
File file = File.createTempFile("testfile", ".txt");
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
synchronized (lock) {
cancellableResources.add(writer);
}
try {
long lineCount = 0;
while (lineCount++ < 100000000) {
writer.write("This is a test text at line: " + lineCount);
writer.newLine();
}
System.out.println("Task completed");
} catch (Exception e) {
e.printStackTrace();
} finally {
writer.close();
file.delete();
synchronized (lock) {
cancellableResources.clear();
}
}
}
return "done";
}
public void cancel() throws Exception {
cancelled = true;
Thread.sleep(1000);
boolean success = false;
synchronized (lock) {
for (Object cancellableResource : cancellableResources) {
if (cancellableResource instanceof Closeable) {
((Closeable) cancellableResource).close();
success = true;
}
}
}
System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all"));
}
}
对于您的 REST Http 客户端相关要求,您可以修改工厂 class 类似这样的内容 -
public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory {
private List<Object> cancellableResources;
public CancellableSimpleClientHttpRequestFactory() {
}
public CancellableSimpleClientHttpRequestFactory(List<Object> cancellableResources) {
this.cancellableResources = cancellableResources;
}
protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException {
HttpURLConnection connection = super.openConnection(url, proxy);
if (cancellableResources != null) {
cancellableResources.add(connection);
}
return connection;
}
}
在这里,您需要在可运行任务中创建 RestTemplate
时使用此工厂。
RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources));
确保您传递的可取消资源列表与您在 CancellableTask
中维护的列表相同。
现在你需要像这样修改CancellableTask
中的cancel()
方法-
synchronized (lock) {
for (Object cancellableResource : cancellableResources) {
if (cancellableResource instanceof HttpURLConnection) {
((HttpURLConnection) cancellableResource).disconnect();
success = true;
}
}
}