ExecutorService通过多个线程执行单个任务n次(n线程"races")

ExecutorService to execute a single task n times by multiple threads (n thread "races")

我需要通过多个线程执行单个任务,这样当第一个线程完成并且在任何其他线程完成之前,所有线程都将停止并重新开始同一个任务。这应该执行n次。

我尝试使用 Callable<V> 和方法 invokeAny()(这就是我使用该集合的原因)但不确定如何实现目标。

ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<String> task = () -> {
    someTask();
    return "";
};
Set<Callable<String>> tasks = new HashSet<>();
IntStream.range(0, n).forEach(i -> {
    tasks.add(task);
    executor.submit(task);
});

如何完成这个?或任何更好的解决方案?

这是一个建议:

class Task implements Callable<Integer> {

    private final static Random RND = new Random();

    @Override
    public Integer call() throws Exception {
        try {
            // Work on task for a random duration
            Thread.sleep(RND.nextInt(5000));
        } catch (InterruptedException e) {
            System.err.println("I was interrupted."
                    + "Someone else probably solved the task before me.");
            return -1;
        }

        // Return some dummy value
        return RND.nextInt();
    }
}

class Scratch {
    public static void main(String[] args) throws InterruptedException {

        final int numWorkers = 3; // number of tasks to run in parallel

        ExecutorService executor = Executors.newFixedThreadPool(numWorkers);

        // Solve task 5 times. (Change it to while (true) { ...} if you like.)
        for (int i = 0; i < 5; i++) {

            CompletionService<Integer> completionService =
                    new ExecutorCompletionService<>(executor);

            Future<?>[] futures = new Future<?>[numWorkers];
            for (int j = 0; j < numWorkers; j++) {
                futures[j] = completionService.submit(new Task());
            }

            Future<Integer> firstToComplete = completionService.take();

            try {
                Integer result = firstToComplete.get();
                System.err.println("We got a result: " + result);
            } catch (ExecutionException e) {
                // Should not happen. Future has completed.
            }

            // Cancel all futures (it doesn't matter that we're cancelling
            // the one that has already completed).
            for (int j = 0; j < numWorkers; j++) {
                futures[j].cancel(true);
            }
        }

        executor.shutdown();
    }
}

如果您正在解决的任务不响应中断,将 true 传递给 cancel(...) 将无济于事。在这种情况下,我建议您进行以下更改:

  1. 在外部 for 循环中创建一个 AtomicBoolean done 变量。
  2. 将其传递给 Task 的构造函数并将其保存在 Task 中的字段中。
  3. 在任务解决过程中,经常检查done标志,如果donetrue,则取消尝试。
  4. 不是在第一个结果出现后对任务调用 cancel,而是将 done 设置为 true 并等待其他线程 return。