通过 CompletionService 查看取消的任务
Check cancelled tasks through CompletionService
我正在调查 CompletionService class,我发现提交队列与完成队列的分离非常有用。
但我也想念 poll/take 取消任务的方法(可以认为在某种程度上已完成)。是否可以轻松完成?
Future<String> task1Future = completionService.submit(myCallableTask1);
Future<String> task2Future = completionService.submit(myCallableTask2);
task2Future.cancel();
Future<String> lastComplTaskFuture = completionService.take();
//Seems to return only the completed or
//interrupted tasks, not those cancelled (task2)
编辑:检查了一些答案后,我意识到发生了什么。
CompletitionService returns 与提交的作业顺序相同。
如果你 运行 工作 a、b 和 c;在a工作时取消b和c;最后轮询completitionService,直到a任务结束才会通知b和c的取消。
另外,我意识到如果你关闭执行器而不是取消单个任务,那些仍在队列中的任务不会到达 completionservice,即使它们的取消已经完成而活动任务还没有结束。
EDIT2:好的,我添加了一个完整的测试用例
import java.util.concurrent.*;
public class CompletionTest {
public static void main(String[] args){
CompletionService<String> completion =
new ExecutorCompletionService<String>(Executors.newSingleThreadExecutor());
Future<String> aFuture =
completion.submit(() -> {Thread.sleep(10000); return "A";});
completion.submit(() -> {return "B";}).cancel(true);
completion.submit(() -> {return "C";}).cancel(true);
completion.submit(() -> {return "D";}).cancel(true);
long arbitraryTime = System.currentTimeMillis();
while (true){
String result = getNextResult(completion);
System.out.println(result);
if (System.currentTimeMillis() > arbitraryTime+5000){
aFuture.cancel(true);
}
}
}
public static String getNextResult(CompletionService<String> service){
try {
String taskId = service.take().get();
return "'"+taskId+"' job completed successfully.";
}
catch (InterruptedException e ) { return "Unknown job was interrupted/stopped while waiting (no thread.wait() or infinite loop in the job so I guess this should not be possible)."; }
catch (CancellationException e) { return "Unknown job was cancelled."; }
catch (ExecutionException e) {
return "Unknown job returned with the following internal exception while running: " + e.getCause().getMessage();
}
}
}
我期望这样的输出:
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
'A' job completed successfully.
但它是:
'A' job completed successfully.
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
我什至尝试使用 PriorityBlockingQueue 作为 CompletionService 的队列,使用 Comparator.<Future<String>,Boolean>comparing(Future::isCancelled).reversed()
但它也没有用(我想如果元素改变状态它不会求助)
您正在使用单线程向线程池提交任务。
您提交的第一个任务将消耗该线程(休眠一小段时间)。
其他人将在 ExecutorService
内排队。取消相应的期货不会从该队列中删除相应的任务。它只是改变 Future
的状态。
任务将保留在队列中,直到有线程空闲来处理它们。线程会注意到相应的 Future
被取消并取消整个任务。那是 CompletionService
知道他们是 "done" 的时候,很快。
考虑增加线程池的大小。
我正在调查 CompletionService class,我发现提交队列与完成队列的分离非常有用。
但我也想念 poll/take 取消任务的方法(可以认为在某种程度上已完成)。是否可以轻松完成?
Future<String> task1Future = completionService.submit(myCallableTask1);
Future<String> task2Future = completionService.submit(myCallableTask2);
task2Future.cancel();
Future<String> lastComplTaskFuture = completionService.take();
//Seems to return only the completed or
//interrupted tasks, not those cancelled (task2)
编辑:检查了一些答案后,我意识到发生了什么。 CompletitionService returns 与提交的作业顺序相同。 如果你 运行 工作 a、b 和 c;在a工作时取消b和c;最后轮询completitionService,直到a任务结束才会通知b和c的取消。 另外,我意识到如果你关闭执行器而不是取消单个任务,那些仍在队列中的任务不会到达 completionservice,即使它们的取消已经完成而活动任务还没有结束。
EDIT2:好的,我添加了一个完整的测试用例
import java.util.concurrent.*;
public class CompletionTest {
public static void main(String[] args){
CompletionService<String> completion =
new ExecutorCompletionService<String>(Executors.newSingleThreadExecutor());
Future<String> aFuture =
completion.submit(() -> {Thread.sleep(10000); return "A";});
completion.submit(() -> {return "B";}).cancel(true);
completion.submit(() -> {return "C";}).cancel(true);
completion.submit(() -> {return "D";}).cancel(true);
long arbitraryTime = System.currentTimeMillis();
while (true){
String result = getNextResult(completion);
System.out.println(result);
if (System.currentTimeMillis() > arbitraryTime+5000){
aFuture.cancel(true);
}
}
}
public static String getNextResult(CompletionService<String> service){
try {
String taskId = service.take().get();
return "'"+taskId+"' job completed successfully.";
}
catch (InterruptedException e ) { return "Unknown job was interrupted/stopped while waiting (no thread.wait() or infinite loop in the job so I guess this should not be possible)."; }
catch (CancellationException e) { return "Unknown job was cancelled."; }
catch (ExecutionException e) {
return "Unknown job returned with the following internal exception while running: " + e.getCause().getMessage();
}
}
}
我期望这样的输出:
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
'A' job completed successfully.
但它是:
'A' job completed successfully.
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
我什至尝试使用 PriorityBlockingQueue 作为 CompletionService 的队列,使用 Comparator.<Future<String>,Boolean>comparing(Future::isCancelled).reversed()
但它也没有用(我想如果元素改变状态它不会求助)
您正在使用单线程向线程池提交任务。
您提交的第一个任务将消耗该线程(休眠一小段时间)。
其他人将在 ExecutorService
内排队。取消相应的期货不会从该队列中删除相应的任务。它只是改变 Future
的状态。
任务将保留在队列中,直到有线程空闲来处理它们。线程会注意到相应的 Future
被取消并取消整个任务。那是 CompletionService
知道他们是 "done" 的时候,很快。
考虑增加线程池的大小。