如何正确使用Java执行器?

How to properly use Java Executor?

我在我的多线程应用程序中使用了 Java 执行器,但我似乎无法弄清楚什么时候使用以下每种方法最好:

1.

ExecutorService executor=Executors.newFixedThreadPool(50);
executor.execute(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }

2.

int Page_Count=200;
ExecutorService executor=Executors.newFixedThreadPool(50);
doneSignal=new CountDownLatch(Page_Count);
for (int i=0;i<Page_Count;i++) executor.execute(new A_Runner(doneSignal, ... some parameter ...));
doneSignal.await();
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }

3.

int Executor_Count=30;
ThreadPoolExecutor executor=new ThreadPoolExecutor(Executor_Count,Executor_Count*2,1,TimeUnit.SECONDS,new LinkedBlockingQueue());
List<Future<String>> futures=new ArrayList<>(3330);

for (int i=0;i<50;i++) futures.add(executor.submit(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { executor.awaitTermination(1,TimeUnit.SECONDS); }
for (Future<String> future : futures)
{
    String f=future.get();
    // ...
}

具体来说,在[2]中,如果我跳过doneSignal,那么它就像[1]一样,那么doneSignal有什么用呢?

此外,在 [3] 中,如果我添加一个 doneSignal 会怎么样?或者有可能吗?

我想知道的是:这些方法是否可以互换,或者在某种情况下我应该使用上面的特定类型?

  1. ExecutorService

    ExecutorService executor=Executors.newFixedThreadPool(50);

    简单易用。它隐藏了 ThreadPoolExecutor.

    的底层细节

    Callable/Runnable 任务数量较少并且在无界队列中堆积任务不会增加内存和降低系统性能时,更喜欢这个。如果您有 CPU/Memory 约束,请使用 ThreadPoolExecutor 和容量约束 & RejectedExecutionHandler 来处理拒绝任务。

  2. CountDownLatch

    您已使用给定计数初始化 CountDownLatch。此计数会因调用 countDown() 方法而递减。我假设您稍后在 Runnable 任务中调用递减。等待此计数达到零的线程可以调用 await() 方法之一。调用 await() 会阻塞线程,直到计数达到零。 这个 class 使 java 线程可以等待,直到其他线程集完成它们的任务。

    用例:

    1. Achieving Maximum Parallelism: 有时我们想同时启动多个线程来达到最大并行

    2. 等待N个线程完成后开始执行

    3. 死锁检测。

      查看 Lokesh Gupta 的 article 了解更多详情。

  3. ThreadPoolExecutor : It provides more control to finetune various thread pool parameters. If your application is constrained by number of active Runnable/Callable tasks, you should use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types of RejectedExecutionHandler policies.

    1. 在默认 ThreadPoolExecutor.AbortPolicy 中,处理程序在拒绝时抛出运行时 RejectedExecutionException。

    2. ThreadPoolExecutor.CallerRunsPolicy中,调用execute的线程自己运行任务。这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度。

    3. ThreadPoolExecutor.DiscardPolicy中,无法执行的任务被简单地丢弃。

    4. ThreadPoolExecutor.DiscardOldestPolicy中,如果执行器没有关闭,则丢弃工作队列头部的任务,然后重试执行(可能会再次失败,导致这个重复。)

      如果你想模拟CountDownLatch行为,你可以使用invokeAll()方法。

  4. 你没有引用的另一种机制是 ForkJoinPool

    ForkJoinPool在Java中添加到Java 7.ForkJoinPool类似于 Java ExecutorService 但有一点不同。 ForkJoinPool 做到了 任务很容易将他们的工作分解成更小的任务,然后 也提交给了ForkJoinPool。当空闲工作线程从繁忙的工作线程队列中窃取任务时,任务窃取发生在 ForkJoinPool 中。

    Java 8 在ExecutorService中又引入了一个API来创建工作窃取池。您不必创建 RecursiveTaskRecursiveAction 但仍然可以使用 ForkJoinPool.

      public static ExecutorService newWorkStealingPool()
    

    Creates a work-stealing thread pool using all available processors as its target parallelism level.

    默认以CPU核数为参数。

这四种机制互为补充。根据您要控制的粒度级别,您必须选择正确的粒度级别。