并行的多个ThreadPoolExecutor

Multiple ThreadPoolExecutor in parallel

我有两套任务。我创建了两个 ThreadPoolExecutor,为它们提供了 2 个任务列表并在两者中调用了 invokeAll 方法。我看到在第一个 ThreadPoolExecutor 的所有任务都完成后,第二个 ThreadPoolExecutor 的任务正在启动。有没有办法并行启动它们?

public class Test3 {

    public static void main(String[] args) throws Exception {
        
        String[] works1 = {"work1", "work2", "work3", "work4", "work5"};
        String[] works2 = {"work6", "work7", "work8", "work9", "work10", "work11"};
        List<String> workList1 = Arrays.asList(works1);
        List<String> workList2 = Arrays.asList(works2);
        List<Callable<Object>> workerList1 = new ArrayList<Callable<Object>>();
        List<Callable<Object>> workerList2 = new ArrayList<Callable<Object>>();
        ThreadPoolExecutor executorService1 = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
        ThreadPoolExecutor executorService2 = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
        for(int i=0; i<workList1.size(); i++) {
            Runnable worker = new TWorker(workList1.get(i));
            Callable<Object> callableWorker = Executors.callable(worker);
            workerList1.add(callableWorker);
        }
        for(int i=0; i<workList2.size(); i++) {
            Runnable worker = new TWorker(workList2.get(i));
            Callable<Object> callableWorker = Executors.callable(worker);
            workerList2.add(callableWorker);
        }
        System.out.println("Invoke all TP1");
        executorService1.invokeAll(workerList1);
        System.out.println("Invoke all TP2");
        executorService2.invokeAll(workerList2);
        executorService1.shutdown();
        while(!executorService1.isTerminated()) {}
        
        executorService2.shutdown();
        while(!executorService2.isTerminated()) {}
    }
    
}

class TWorker implements Runnable {

    private String work;
    public TWorker(String work) {
        this.work = work;
    }
    @Override
    public void run() {
        System.out.println("Thread : "+Thread.currentThread().getName() +" | work : "+work+" | execution start");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Thread : "+Thread.currentThread().getName() +" | work : "+work+" | execution end");
    }
}

您的代码稍作修改即可满足您的需求:

public class Test3 {

    public static void main(String[] args) throws Exception {
        
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                // Simulate a long-running Job
                try {
                    String[] works1 = {"work1", "work2", "work3", "work4", "work5"};
                    List<String> workList1 = Arrays.asList(works1);
                    List<Callable<Object>> workerList1 = new ArrayList<>();
                    ThreadPoolExecutor executorService1 = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
                    for(int i=0; i<workList1.size(); i++) {
                        Runnable worker = new TWorker(workList1.get(i));
                        Callable<Object> callableWorker = Executors.callable(worker);
                        workerList1.add(callableWorker);
                    }
                    System.out.println("Invoke all TP1");
                    executorService1.invokeAll(workerList1);
                    executorService1.shutdown();
                    while(!executorService1.isTerminated()) {}

                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
                System.out.println("I'll run in a separate thread than the main thread.");
            }
        });

        // Block and wait for the future to complete
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                // Simulate a long-running Job
                try {
                    String[] works2 = {"work6", "work7", "work8", "work9", "work10", "work11"};
                    List<String> workList2 = Arrays.asList(works2);
                    List<Callable<Object>> workerList2 = new ArrayList<Callable<Object>>();
                    ThreadPoolExecutor executorService2 = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
                    for(int i=0; i<workList2.size(); i++) {
                        Runnable worker = new TWorker(workList2.get(i));
                        Callable<Object> callableWorker = Executors.callable(worker);
                        workerList2.add(callableWorker);
                    }
                    System.out.println("Invoke all TP2");
                    executorService2.invokeAll(workerList2);
                    executorService2.shutdown();
                    while(!executorService2.isTerminated()) {}

                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
                System.out.println("I'll run in a separate thread than the main thread.");
            }
        });

        // Block and wait for the future to complete
        future1.get();
        future2.get();
    }
}

Is there a way to initiate them in parallel?

当然可以。与其将它们添加到列表中,不如直接将它们提交到线程池中。

Executor executorService1 = Executors.newFixedThreadPool(3);
Executor executorService2 = Executors.newFixedThreadPool(3);
for (int i = 0; i < works1.length; i++) {
    executorService1.submit(new TWorker(works1[i]));
}
// once all jobs have been submitted we can shutdown the pool
executorService1.shutdown();
for(int i = 0; i < works2.length; i++) {
    executorService2.submit(new TWorker(works2[i]));
}
// once all jobs have been submitted we can shutdown the pool
executorService2.shutdown();
executorService1.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
executorService2.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

一旦您提交这些作业,它们就会运行并行开始。在提交最后一个作业后关闭池,但提交的作业将在后台继续 运行。最后,不要在紧密的 !executorService1.isTerminated() 循环中旋转(这绝不是一个好主意),只需使用 awaitTermination(...).