ExecutorService通过多个线程执行单个任务n次(n线程"races")
ExecutorService to execute a single task n times by multiple threads (n thread "races")
我需要通过多个线程执行单个任务,这样当第一个线程完成并且在任何其他线程完成之前,所有线程都将停止并重新开始同一个任务。这应该执行n次。
我尝试使用 Callable<V>
和方法 invokeAny()
(这就是我使用该集合的原因)但不确定如何实现目标。
ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<String> task = () -> {
someTask();
return "";
};
Set<Callable<String>> tasks = new HashSet<>();
IntStream.range(0, n).forEach(i -> {
tasks.add(task);
executor.submit(task);
});
如何完成这个?或任何更好的解决方案?
这是一个建议:
class Task implements Callable<Integer> {
private final static Random RND = new Random();
@Override
public Integer call() throws Exception {
try {
// Work on task for a random duration
Thread.sleep(RND.nextInt(5000));
} catch (InterruptedException e) {
System.err.println("I was interrupted."
+ "Someone else probably solved the task before me.");
return -1;
}
// Return some dummy value
return RND.nextInt();
}
}
class Scratch {
public static void main(String[] args) throws InterruptedException {
final int numWorkers = 3; // number of tasks to run in parallel
ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
// Solve task 5 times. (Change it to while (true) { ...} if you like.)
for (int i = 0; i < 5; i++) {
CompletionService<Integer> completionService =
new ExecutorCompletionService<>(executor);
Future<?>[] futures = new Future<?>[numWorkers];
for (int j = 0; j < numWorkers; j++) {
futures[j] = completionService.submit(new Task());
}
Future<Integer> firstToComplete = completionService.take();
try {
Integer result = firstToComplete.get();
System.err.println("We got a result: " + result);
} catch (ExecutionException e) {
// Should not happen. Future has completed.
}
// Cancel all futures (it doesn't matter that we're cancelling
// the one that has already completed).
for (int j = 0; j < numWorkers; j++) {
futures[j].cancel(true);
}
}
executor.shutdown();
}
}
如果您正在解决的任务不响应中断,将 true
传递给 cancel(...)
将无济于事。在这种情况下,我建议您进行以下更改:
- 在外部
for
循环中创建一个 AtomicBoolean done
变量。
- 将其传递给
Task
的构造函数并将其保存在 Task
中的字段中。
- 在任务解决过程中,经常检查
done
标志,如果done
是true
,则取消尝试。
- 不是在第一个结果出现后对任务调用
cancel
,而是将 done
设置为 true
并等待其他线程 return。
我需要通过多个线程执行单个任务,这样当第一个线程完成并且在任何其他线程完成之前,所有线程都将停止并重新开始同一个任务。这应该执行n次。
我尝试使用 Callable<V>
和方法 invokeAny()
(这就是我使用该集合的原因)但不确定如何实现目标。
ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<String> task = () -> {
someTask();
return "";
};
Set<Callable<String>> tasks = new HashSet<>();
IntStream.range(0, n).forEach(i -> {
tasks.add(task);
executor.submit(task);
});
如何完成这个?或任何更好的解决方案?
这是一个建议:
class Task implements Callable<Integer> {
private final static Random RND = new Random();
@Override
public Integer call() throws Exception {
try {
// Work on task for a random duration
Thread.sleep(RND.nextInt(5000));
} catch (InterruptedException e) {
System.err.println("I was interrupted."
+ "Someone else probably solved the task before me.");
return -1;
}
// Return some dummy value
return RND.nextInt();
}
}
class Scratch {
public static void main(String[] args) throws InterruptedException {
final int numWorkers = 3; // number of tasks to run in parallel
ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
// Solve task 5 times. (Change it to while (true) { ...} if you like.)
for (int i = 0; i < 5; i++) {
CompletionService<Integer> completionService =
new ExecutorCompletionService<>(executor);
Future<?>[] futures = new Future<?>[numWorkers];
for (int j = 0; j < numWorkers; j++) {
futures[j] = completionService.submit(new Task());
}
Future<Integer> firstToComplete = completionService.take();
try {
Integer result = firstToComplete.get();
System.err.println("We got a result: " + result);
} catch (ExecutionException e) {
// Should not happen. Future has completed.
}
// Cancel all futures (it doesn't matter that we're cancelling
// the one that has already completed).
for (int j = 0; j < numWorkers; j++) {
futures[j].cancel(true);
}
}
executor.shutdown();
}
}
如果您正在解决的任务不响应中断,将 true
传递给 cancel(...)
将无济于事。在这种情况下,我建议您进行以下更改:
- 在外部
for
循环中创建一个AtomicBoolean done
变量。 - 将其传递给
Task
的构造函数并将其保存在Task
中的字段中。 - 在任务解决过程中,经常检查
done
标志,如果done
是true
,则取消尝试。 - 不是在第一个结果出现后对任务调用
cancel
,而是将done
设置为true
并等待其他线程 return。