如何正确使用Java执行器?
How to properly use Java Executor?
我在我的多线程应用程序中使用了 Java 执行器,但我似乎无法弄清楚什么时候使用以下每种方法最好:
1.
ExecutorService executor=Executors.newFixedThreadPool(50);
executor.execute(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }
2.
int Page_Count=200;
ExecutorService executor=Executors.newFixedThreadPool(50);
doneSignal=new CountDownLatch(Page_Count);
for (int i=0;i<Page_Count;i++) executor.execute(new A_Runner(doneSignal, ... some parameter ...));
doneSignal.await();
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }
3.
int Executor_Count=30;
ThreadPoolExecutor executor=new ThreadPoolExecutor(Executor_Count,Executor_Count*2,1,TimeUnit.SECONDS,new LinkedBlockingQueue());
List<Future<String>> futures=new ArrayList<>(3330);
for (int i=0;i<50;i++) futures.add(executor.submit(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { executor.awaitTermination(1,TimeUnit.SECONDS); }
for (Future<String> future : futures)
{
String f=future.get();
// ...
}
具体来说,在[2]中,如果我跳过doneSignal,那么它就像[1]一样,那么doneSignal有什么用呢?
此外,在 [3] 中,如果我添加一个 doneSignal 会怎么样?或者有可能吗?
我想知道的是:这些方法是否可以互换,或者在某种情况下我应该使用上面的特定类型?
-
ExecutorService executor=Executors.newFixedThreadPool(50);
简单易用。它隐藏了 ThreadPoolExecutor
.
的底层细节
当 Callable/Runnable
任务数量较少并且在无界队列中堆积任务不会增加内存和降低系统性能时,更喜欢这个。如果您有 CPU/Memory
约束,请使用 ThreadPoolExecutor
和容量约束 & RejectedExecutionHandler
来处理拒绝任务。
-
您已使用给定计数初始化 CountDownLatch
。此计数会因调用 countDown()
方法而递减。我假设您稍后在 Runnable 任务中调用递减。等待此计数达到零的线程可以调用 await()
方法之一。调用 await()
会阻塞线程,直到计数达到零。 这个 class 使 java 线程可以等待,直到其他线程集完成它们的任务。
用例:
Achieving Maximum Parallelism: 有时我们想同时启动多个线程来达到最大并行
等待N个线程完成后开始执行
死锁检测。
查看 Lokesh Gupta 的 article 了解更多详情。
ThreadPoolExecutor : It provides more control to finetune various thread pool parameters. If your application is constrained by number of active Runnable/Callable
tasks, you should use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types of RejectedExecutionHandler
policies.
在默认 ThreadPoolExecutor.AbortPolicy
中,处理程序在拒绝时抛出运行时 RejectedExecutionException。
在ThreadPoolExecutor.CallerRunsPolicy
中,调用execute的线程自己运行任务。这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度。
在ThreadPoolExecutor.DiscardPolicy
中,无法执行的任务被简单地丢弃。
在ThreadPoolExecutor.DiscardOldestPolicy
中,如果执行器没有关闭,则丢弃工作队列头部的任务,然后重试执行(可能会再次失败,导致这个重复。)
如果你想模拟CountDownLatch
行为,你可以使用invokeAll()
方法。
你没有引用的另一种机制是 ForkJoinPool
ForkJoinPool
在Java中添加到Java 7.ForkJoinPool
类似于
Java ExecutorService
但有一点不同。 ForkJoinPool
做到了
任务很容易将他们的工作分解成更小的任务,然后
也提交给了ForkJoinPool
。当空闲工作线程从繁忙的工作线程队列中窃取任务时,任务窃取发生在 ForkJoinPool
中。
Java 8 在ExecutorService中又引入了一个API来创建工作窃取池。您不必创建 RecursiveTask
和 RecursiveAction
但仍然可以使用 ForkJoinPool
.
public static ExecutorService newWorkStealingPool()
Creates a work-stealing thread pool using all available processors as its target parallelism level.
默认以CPU核数为参数。
这四种机制互为补充。根据您要控制的粒度级别,您必须选择正确的粒度级别。
我在我的多线程应用程序中使用了 Java 执行器,但我似乎无法弄清楚什么时候使用以下每种方法最好:
1.
ExecutorService executor=Executors.newFixedThreadPool(50);
executor.execute(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }
2.
int Page_Count=200;
ExecutorService executor=Executors.newFixedThreadPool(50);
doneSignal=new CountDownLatch(Page_Count);
for (int i=0;i<Page_Count;i++) executor.execute(new A_Runner(doneSignal, ... some parameter ...));
doneSignal.await();
executor.shutdown();
while (!executor.isTerminated()) { Thread.sleep(100); }
3.
int Executor_Count=30;
ThreadPoolExecutor executor=new ThreadPoolExecutor(Executor_Count,Executor_Count*2,1,TimeUnit.SECONDS,new LinkedBlockingQueue());
List<Future<String>> futures=new ArrayList<>(3330);
for (int i=0;i<50;i++) futures.add(executor.submit(new A_Runner(... some parameter ...));
executor.shutdown();
while (!executor.isTerminated()) { executor.awaitTermination(1,TimeUnit.SECONDS); }
for (Future<String> future : futures)
{
String f=future.get();
// ...
}
具体来说,在[2]中,如果我跳过doneSignal,那么它就像[1]一样,那么doneSignal有什么用呢?
此外,在 [3] 中,如果我添加一个 doneSignal 会怎么样?或者有可能吗?
我想知道的是:这些方法是否可以互换,或者在某种情况下我应该使用上面的特定类型?
-
ExecutorService executor=Executors.newFixedThreadPool(50);
简单易用。它隐藏了
的底层细节ThreadPoolExecutor
.当
Callable/Runnable
任务数量较少并且在无界队列中堆积任务不会增加内存和降低系统性能时,更喜欢这个。如果您有CPU/Memory
约束,请使用ThreadPoolExecutor
和容量约束 &RejectedExecutionHandler
来处理拒绝任务。 -
您已使用给定计数初始化
CountDownLatch
。此计数会因调用countDown()
方法而递减。我假设您稍后在 Runnable 任务中调用递减。等待此计数达到零的线程可以调用await()
方法之一。调用await()
会阻塞线程,直到计数达到零。 这个 class 使 java 线程可以等待,直到其他线程集完成它们的任务。用例:
Achieving Maximum Parallelism: 有时我们想同时启动多个线程来达到最大并行
等待N个线程完成后开始执行
死锁检测。
查看 Lokesh Gupta 的 article 了解更多详情。
ThreadPoolExecutor : It provides more control to finetune various thread pool parameters. If your application is constrained by number of active
Runnable/Callable
tasks, you should use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types ofRejectedExecutionHandler
policies.在默认
ThreadPoolExecutor.AbortPolicy
中,处理程序在拒绝时抛出运行时 RejectedExecutionException。在
ThreadPoolExecutor.CallerRunsPolicy
中,调用execute的线程自己运行任务。这提供了一种简单的反馈控制机制,可以减慢提交新任务的速度。在
ThreadPoolExecutor.DiscardPolicy
中,无法执行的任务被简单地丢弃。在
ThreadPoolExecutor.DiscardOldestPolicy
中,如果执行器没有关闭,则丢弃工作队列头部的任务,然后重试执行(可能会再次失败,导致这个重复。)如果你想模拟
CountDownLatch
行为,你可以使用invokeAll()
方法。
你没有引用的另一种机制是 ForkJoinPool
ForkJoinPool
在Java中添加到Java 7.ForkJoinPool
类似于 JavaExecutorService
但有一点不同。ForkJoinPool
做到了 任务很容易将他们的工作分解成更小的任务,然后 也提交给了ForkJoinPool
。当空闲工作线程从繁忙的工作线程队列中窃取任务时,任务窃取发生在ForkJoinPool
中。Java 8 在ExecutorService中又引入了一个API来创建工作窃取池。您不必创建
RecursiveTask
和RecursiveAction
但仍然可以使用ForkJoinPool
.public static ExecutorService newWorkStealingPool()
Creates a work-stealing thread pool using all available processors as its target parallelism level.
默认以CPU核数为参数。
这四种机制互为补充。根据您要控制的粒度级别,您必须选择正确的粒度级别。