使用 CompletionService 时强制执行 executorService.awaitTermination
Enforce executorService.awaitTermination while using CompletionService
我正在尝试提交多个任务并在结果可用时获取结果。但是,在循环结束后,我必须强制所有任务在指定的时间内完成。如果不是,则抛出错误。最初,我只有 executorService 的 invokeAll、shutdown 和 awaitTermination calls that were used to ensure that all tasks complete (inspite of errors or not). I migrated the code to use CompletionService 来显示结果。我可以在哪里执行 CompletionService 调用中的 awaitTermination 子句?
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
logger.info("Submitting all tasks");
for (Callable<String> task : tasks)
completionService.submit(task);
executor.shutdown();
logger.info("Tasks submitted. Now checking the status.");
while (!executor.isTerminated())
{
final Future<String> future = completionService.take();
String itemValue;
try
{
itemValue = future.get();
if (!itemValue.equals("Bulk"))
logger.info("Backup completed for " + itemValue);
}
catch (InterruptedException | ExecutionException e)
{
String message = e.getCause().getMessage();
String objName = "Bulk";
if (message.contains("(") && message.contains(")"))
objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
logger.error("Failed retrieving the task status for " + objName, e);
}
}
executor.awaitTermination(24, TimeUnit.HOURS);
换句话说,我如何利用 CompletionService 的超时时间?
编辑:
我的初始代码如下所示。问题是我正在遍历未来列表,然后将它们打印为已完成。但是,我的要求是显示以 FCFS 为基础完成的那些。
List<Future<String>> results = executor.invokeAll(tasks);
executor.shutdown();
executor.awaitTermination(24, TimeUnit.HOURS);
while (results.size() > 0)
{
for (Iterator<Future<String>> iterator = results.iterator(); iterator.hasNext();)
{
Future<String> item = iterator.next();
if (item.isDone())
{
String itemValue;
try
{
itemValue = item.get();
if (!itemValue.equals("Bulk"))
logger.info("Backup completed for " + itemValue);
}
catch (InterruptedException | ExecutionException e)
{
String message = e.getCause().getMessage();
String objName = "Bulk";
if (message.contains("(") && message.contains(")"))
objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
logger.error("Failed retrieving the task status for " + objName, e);
}
finally
{
iterator.remove();
}
}
}
}
我建议您等待执行程序在另一个线程上终止
这样您就可以实现服务结果 FCFS 并强制执行超时。
可以通过如下所示的方式轻松实现
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
// place all the work in a function (an Anonymous Runnable in this case)
// completionService.submit(() ->{work});
// as soon as the work is submitted it is handled by another Thread
completionService.submit(() ->{
logger.info("Submitting all tasks");
for (Callable<String> task : tasks)
completionService.submit(task);
logger.info("Tasks submitted. Now checking the status.");
int counter = tasks.size();
for(int i = counter; counter >=1; counter--) // Replaced the while loop
{
final Future<String> future = completionService.take();
String itemValue;
try
{
itemValue = future.get();
if (!itemValue.equals("Bulk"))
logger.info("Backup completed for " + itemValue);
}
catch (InterruptedException | ExecutionException e)
{
String message = e.getCause().getMessage();
String objName = "Bulk";
if (message.contains("(") && message.contains(")"))
objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
logger.error("Failed retrieving the task status for " + objName, e);
}
}
});
// After submitting the work to another Thread
// Wait in your Main Thread, and enforce termination if needed
shutdownAndAwaitTermination(executor);
您使用此处理执行程序终止和等待(取自 ExecutorsService)
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(24, TimeUnit.HOURS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
那好吧,你需要监控完成。那么,为什么你不按照文档使用呢? https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html 因此,它将 n
任务提交给 ExecutorCompletionService
的新实例并等待 n
完成。不再终止,你可以重用同一个执行器(通常是线程池,创建一个新线程比从池中重用更昂贵)。因此,如果我将文档中的代码改编为您的场景,它将类似于:
CompletionService<Result> ecs
= new ExecutorCompletionService<String>(executor);
for (Callable<Result> task : tasks)
ecs.submit(task);
logger.info("Tasks submitted. Now checking the status.");
int n = tasks.size();
for (int i = 0; i < n; ++i) {
try {
String r = ecs.take().get();
logger.info("Backup completed for " + r);
}
catch(InterruptedException | ExecutionException e) {
...
}
}
此外,解析异常消息不是个好主意,如果您创建自定义异常 class 并使用 instanceof
.
会更好
如果您需要超时才能完成 - 使用带有时间参数的 poll
而不是 take
。
我正在尝试提交多个任务并在结果可用时获取结果。但是,在循环结束后,我必须强制所有任务在指定的时间内完成。如果不是,则抛出错误。最初,我只有 executorService 的 invokeAll、shutdown 和 awaitTermination calls that were used to ensure that all tasks complete (inspite of errors or not). I migrated the code to use CompletionService 来显示结果。我可以在哪里执行 CompletionService 调用中的 awaitTermination 子句?
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
logger.info("Submitting all tasks");
for (Callable<String> task : tasks)
completionService.submit(task);
executor.shutdown();
logger.info("Tasks submitted. Now checking the status.");
while (!executor.isTerminated())
{
final Future<String> future = completionService.take();
String itemValue;
try
{
itemValue = future.get();
if (!itemValue.equals("Bulk"))
logger.info("Backup completed for " + itemValue);
}
catch (InterruptedException | ExecutionException e)
{
String message = e.getCause().getMessage();
String objName = "Bulk";
if (message.contains("(") && message.contains(")"))
objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
logger.error("Failed retrieving the task status for " + objName, e);
}
}
executor.awaitTermination(24, TimeUnit.HOURS);
换句话说,我如何利用 CompletionService 的超时时间?
编辑:
我的初始代码如下所示。问题是我正在遍历未来列表,然后将它们打印为已完成。但是,我的要求是显示以 FCFS 为基础完成的那些。
List<Future<String>> results = executor.invokeAll(tasks);
executor.shutdown();
executor.awaitTermination(24, TimeUnit.HOURS);
while (results.size() > 0)
{
for (Iterator<Future<String>> iterator = results.iterator(); iterator.hasNext();)
{
Future<String> item = iterator.next();
if (item.isDone())
{
String itemValue;
try
{
itemValue = item.get();
if (!itemValue.equals("Bulk"))
logger.info("Backup completed for " + itemValue);
}
catch (InterruptedException | ExecutionException e)
{
String message = e.getCause().getMessage();
String objName = "Bulk";
if (message.contains("(") && message.contains(")"))
objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
logger.error("Failed retrieving the task status for " + objName, e);
}
finally
{
iterator.remove();
}
}
}
}
我建议您等待执行程序在另一个线程上终止
这样您就可以实现服务结果 FCFS 并强制执行超时。
可以通过如下所示的方式轻松实现
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
// place all the work in a function (an Anonymous Runnable in this case)
// completionService.submit(() ->{work});
// as soon as the work is submitted it is handled by another Thread
completionService.submit(() ->{
logger.info("Submitting all tasks");
for (Callable<String> task : tasks)
completionService.submit(task);
logger.info("Tasks submitted. Now checking the status.");
int counter = tasks.size();
for(int i = counter; counter >=1; counter--) // Replaced the while loop
{
final Future<String> future = completionService.take();
String itemValue;
try
{
itemValue = future.get();
if (!itemValue.equals("Bulk"))
logger.info("Backup completed for " + itemValue);
}
catch (InterruptedException | ExecutionException e)
{
String message = e.getCause().getMessage();
String objName = "Bulk";
if (message.contains("(") && message.contains(")"))
objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
logger.error("Failed retrieving the task status for " + objName, e);
}
}
});
// After submitting the work to another Thread
// Wait in your Main Thread, and enforce termination if needed
shutdownAndAwaitTermination(executor);
您使用此处理执行程序终止和等待(取自 ExecutorsService)
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(24, TimeUnit.HOURS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
那好吧,你需要监控完成。那么,为什么你不按照文档使用呢? https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html 因此,它将 n
任务提交给 ExecutorCompletionService
的新实例并等待 n
完成。不再终止,你可以重用同一个执行器(通常是线程池,创建一个新线程比从池中重用更昂贵)。因此,如果我将文档中的代码改编为您的场景,它将类似于:
CompletionService<Result> ecs
= new ExecutorCompletionService<String>(executor);
for (Callable<Result> task : tasks)
ecs.submit(task);
logger.info("Tasks submitted. Now checking the status.");
int n = tasks.size();
for (int i = 0; i < n; ++i) {
try {
String r = ecs.take().get();
logger.info("Backup completed for " + r);
}
catch(InterruptedException | ExecutionException e) {
...
}
}
此外,解析异常消息不是个好主意,如果您创建自定义异常 class 并使用 instanceof
.
如果您需要超时才能完成 - 使用带有时间参数的 poll
而不是 take
。