如果 ES 上的项目可以重新提交给 ES,我怎么知道 ExecutorService 何时完成
How do I know when ExecutorService has finished if items on the ES can resubmit to the ES
我的 Java 应用程序处理文件夹中的音乐文件,它旨在并行且独立地处理多个文件夹。为此,每个文件夹都由 ExecutorService 处理,该服务的最大池大小与计算机的 CPU 相匹配。
例如,如果我们有8-CPU台电脑那么可以(理论上)同时处理8个文件夹,如果我们有16-CPU台电脑那么可以同时处理16个文件夹.如果我们只有 1 个 CPU,那么我们将 pool-size 设置为 3,以允许 CPU 在一个文件夹在 I/O.
上被阻止时继续执行某些操作
但是,我们实际上并没有只有一个 ExecutorService,我们有多个,因为每个文件夹都可以经历多个阶段。
Process1(使用ExecutorService1)→ Process2(ExecutorService2)→ Process3(ExecutorService3)
进程 1、2、3 等都实现了 Callable,并且都有自己关联的 ExecutorService。我们启动了一个 FileLoader 进程,它加载文件夹,然后为每个文件夹创建一个 Process1 可调用对象并提交给 Process1 执行程序,对于每个 Process1 可调用对象,它将完成其工作,然后提交给不同的可调用对象,这可能是 Process2、Process3等等,但我们永远不会倒退,例如 Process3 永远不会提交给 Process1。
我们实际上有 12 个进程,但任何特定文件夹都不太可能经历所有 12 个进程
但我意识到这是有缺陷的,因为在 16-CPU 计算机的情况下,每个 ES 的池大小可以为 16,所以我们实际上有 48 个线程 运行,这将只是引起了太多争论。
所以我要做的是让所有进程(Process1、Process2...)使用相同的 ExecutorService,这样我们就只有匹配 CPUs 的工作线程。
但是,在我目前的情况下,我们有一个 SongLoader 进程,它只提交了一个任务(加载所有文件夹),然后我们调用 shutdown(),直到所有内容都提交给 Process0,这才会完成,然后 Process0 上的 shutdown() 将不会成功,直到所有内容都发送到 Process1 等等。
//Init Services
services.add(songLoaderService);
services.add(Process1.getExecutorService());
services.add(Process2.getExecutorService());
services.add(Process3.getExecutorService());
for (ExecutorService service : services)
//Request Shutdown
service.shutdown();
//Now wait for all submitted tasks to complete
service.awaitTermination(10, TimeUnit.DAYS);
}
//...............
//Finish Off work
但是,如果一切都在同一个 ES 上并且 Process1 正在提交给 Process2,这将不再有效,因为当时调用 shutdown() 并不是 Process1 会提交给 Process2 的所有文件夹,因此它会过早关闭.
那么,当该 ES 上的任务可以提交给同一 ES 上的其他任务时,我如何检测何时使用单个 ExecutorService 完成所有工作?
或者有更好的方法吗?
注意,你可能会想他为什么不把Process1,2 & 3的逻辑合并成一个Process。困难在于,虽然我最初是按文件夹对歌曲进行分组的,但有时歌曲会被分成更小的组,并且它们会被分配到不同的进程中,而不一定是同一个进程,实际上总共有 12 个进程。
基于 Sholms 想法的尝试
主线程
private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
...
SongLoader loader = SongLoader.getInstanceOf(parentFolder);
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
for(Future future : futures)
{
try
{
future.get();
}
catch (InterruptedException ie)
{
SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
}
catch(ExecutionException e)
{
SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
}
}
songLoaderService.shutdown();
使用来自 MainAnalyserService
的函数提交新任务的流程代码
public void submit(Callable<Boolean> task) //throws Exception
{
FixSongsController.getFutures().add(getExecutorService().submit(task));
}
它看起来像是在工作,但失败了
java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.base/java.util.ArrayList$Itr.next(Unknown Source)
at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
at java.desktop/javax.swing.SwingWorker.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
我现在松了一口气,我不能让一个线程调用 future.get()(等待完成),同时其他线程正在添加到列表中。
不要shutdown()
ExecutorService
。相反,创建 Callable
个对象并保留它们创建的 Future
个对象。
现在您可以等待 Future
个对象,而不是等待 ExecutorService
个对象。请注意,现在您将不得不分别等待每个未来的对象,但是如果您只需要知道最后一个对象何时完成,那么您也可以按任何给定的顺序迭代它们并调用 get()
.
任何任务都可以提交更多任务,并且需要确保将其未来对象放入主线程将监视的队列中。
// put these somewhere public
ConcurrentLinkedQueue<Future<Boolean>> futures = new ConcurrentLinkedQueue<Future<Boolean>>();
ExecutorService executor = ...
void submit(Callable<Boolean> c) {
futures.add(executor.submit(c));
}
现在您的主线程可以开始提交任务并等待所有任务和子任务:
void mainThread() {
// add some tasks from main thread
for(int i=0 ; i<N ; ++i){
Callable<Boolean> callable = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
...
}
submit(callable);
}
Future<Boolean> head = null;
while((head=futures.poll()) != null){
try {
head.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// At this point, all of your tasks are complete including subtasks.
executor.shutdown();
executor.awaitTermination(); // should return almost immediately
}
我同意 Shloim 的观点,您在这里不需要多个 ExecutorService
实例 - 只需一个(根据您可用的 CPU 数量调整大小)就足够了,而且实际上是最佳的。其实我觉得你可能不需要ExecutorService
;如果您使用信号完整性的外部机制,一个简单的 Executor
就可以完成这项工作。
我会先构建一个 class 来表示整个较大的工作项。如果你需要消费每个子工作项的结果,你可以使用队列,但如果你只想知道是否还有工作要做,你只需要一个计数器。
例如,您可以这样做:
public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private int pendingItems; // guarded by monitor lock on this instance
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public synchronized void enqueueMoreWork(File file) {
pendingItems++;
executor.execute(new FileWork(file, this));
}
public synchronized void markWorkItemCompleted() {
pendingItems--;
notifyAll();
}
public synchronized boolean hasPendingWork() {
return pendingItems > 0;
}
public synchronized void awaitCompletion() {
while (pendingItems > 0) {
wait();
}
}
}
public class FileWork implements Runnable {
private final File file;
private final FolderWork parent;
public FileWork(File file, FolderWork parent) {
this.file = file;
this.parent = parent;
}
@Override
public void run() {
try {
// do some work with the file
if (/* found more work to do */) {
parent.enqueueMoreWork(...);
}
} finally {
parent.markWorkItemCompleted();
}
}
}
如果您担心 pendingItems
计数器的同步开销,您可以改用 AtomicInteger
。然后你需要一个单独的机制来通知等待线程我们已经完成了;例如,您可以使用 CountDownLatch
。这是一个示例实现:
public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public void enqueueMoreWork(File file) {
if (latch.getCount() == 0) {
throw new IllegalStateException(
"Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
}
pendingItems.incrementAndGet();
executor.execute(new FileWork(file, this));
}
public void markWorkItemCompleted() {
int remainingItems = pendingItems.decrementAndGet();
if (remainingItems == 0) {
latch.countDown();
}
}
public boolean hasPendingWork() {
return pendingItems.get() > 0;
}
public void awaitCompletion() {
latch.await();
}
}
你可以这样称呼它:
Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();
此示例仅显示一级子工作项,但您可以使用任意数量的子工作项,只要它们都使用相同的 pendingItems
计数器来跟踪剩余的工作量做。
这基本上是@DanielPrydens 的解决方案,但我对其进行了一些修改,以便更清楚地说明如何解决我的特定问题
创建了一个新的 class MainAnalyserService 来处理 ExecutorService 的创建并提供在新的可调用任务时进行计数的能力提交并完成时
public class MainAnalyserService
{
public static final int MIN_NUMBER_OF_WORKER_THREADS = 3;
protected static int BOUNDED_QUEUE_SIZE = 100;
private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);
private static final int TIMEOUT_PER_TASK = 30;
protected ExecutorService executorService;
protected String threadGroup;
public MainAnalyserService(String threadGroup)
{
this.threadGroup=threadGroup;
initExecutorService();
}
protected void initExecutorService()
{
int workerSize = Runtime.getRuntime().availableProcessors();
//Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
{
workerSize = MIN_NUMBER_OF_WORKER_THREADS;
}
executorService = new TimeoutThreadPoolExecutor(workerSize,
new SongKongThreadFactory(threadGroup),
new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
TIMEOUT_PER_TASK,
TimeUnit.MINUTES);
}
public void submit(Callable<Boolean> task) //throws Exception
{
executorService.submit(task);
pendingItems.incrementAndGet();
}
public void workDone()
{
int remainingItems = pendingItems.decrementAndGet();
if (remainingItems == 0)
{
latch.countDown();
}
}
public void awaitCompletion() throws InterruptedException{
latch.await();
}
}
在 FixSongsController 线程中我们有
analyserService = new MainAnalyserService(THREAD_WORKER);
//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all initial folder submissions completed
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
songLoaderService.shutdown();
//Wait for all aysnc tasks to complete
analyserService.awaitCompletion();
然后任何 Callable(例如 Process1、Process2 等)调用 submit() 提交新的 Callable 在 ExecutorService 上,然后它必须在完成时调用 workDone(),所以为了确保我这样做,我在 call() 中添加了一个 finally 块每个过程 class 方法
例如
public Boolean call()
{
try
{
//do stuff
//Possibly make multiple calls to
FixSongsController.getAnalyserService().submit();
}
finally
{
FixSongsController.getAnalyserService().workDone();
}
}
我的 Java 应用程序处理文件夹中的音乐文件,它旨在并行且独立地处理多个文件夹。为此,每个文件夹都由 ExecutorService 处理,该服务的最大池大小与计算机的 CPU 相匹配。
例如,如果我们有8-CPU台电脑那么可以(理论上)同时处理8个文件夹,如果我们有16-CPU台电脑那么可以同时处理16个文件夹.如果我们只有 1 个 CPU,那么我们将 pool-size 设置为 3,以允许 CPU 在一个文件夹在 I/O.
上被阻止时继续执行某些操作但是,我们实际上并没有只有一个 ExecutorService,我们有多个,因为每个文件夹都可以经历多个阶段。
Process1(使用ExecutorService1)→ Process2(ExecutorService2)→ Process3(ExecutorService3)
进程 1、2、3 等都实现了 Callable,并且都有自己关联的 ExecutorService。我们启动了一个 FileLoader 进程,它加载文件夹,然后为每个文件夹创建一个 Process1 可调用对象并提交给 Process1 执行程序,对于每个 Process1 可调用对象,它将完成其工作,然后提交给不同的可调用对象,这可能是 Process2、Process3等等,但我们永远不会倒退,例如 Process3 永远不会提交给 Process1。 我们实际上有 12 个进程,但任何特定文件夹都不太可能经历所有 12 个进程
但我意识到这是有缺陷的,因为在 16-CPU 计算机的情况下,每个 ES 的池大小可以为 16,所以我们实际上有 48 个线程 运行,这将只是引起了太多争论。
所以我要做的是让所有进程(Process1、Process2...)使用相同的 ExecutorService,这样我们就只有匹配 CPUs 的工作线程。
但是,在我目前的情况下,我们有一个 SongLoader 进程,它只提交了一个任务(加载所有文件夹),然后我们调用 shutdown(),直到所有内容都提交给 Process0,这才会完成,然后 Process0 上的 shutdown() 将不会成功,直到所有内容都发送到 Process1 等等。
//Init Services
services.add(songLoaderService);
services.add(Process1.getExecutorService());
services.add(Process2.getExecutorService());
services.add(Process3.getExecutorService());
for (ExecutorService service : services)
//Request Shutdown
service.shutdown();
//Now wait for all submitted tasks to complete
service.awaitTermination(10, TimeUnit.DAYS);
}
//...............
//Finish Off work
但是,如果一切都在同一个 ES 上并且 Process1 正在提交给 Process2,这将不再有效,因为当时调用 shutdown() 并不是 Process1 会提交给 Process2 的所有文件夹,因此它会过早关闭.
那么,当该 ES 上的任务可以提交给同一 ES 上的其他任务时,我如何检测何时使用单个 ExecutorService 完成所有工作?
或者有更好的方法吗?
注意,你可能会想他为什么不把Process1,2 & 3的逻辑合并成一个Process。困难在于,虽然我最初是按文件夹对歌曲进行分组的,但有时歌曲会被分成更小的组,并且它们会被分配到不同的进程中,而不一定是同一个进程,实际上总共有 12 个进程。
基于 Sholms 想法的尝试
主线程
private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
...
SongLoader loader = SongLoader.getInstanceOf(parentFolder);
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
for(Future future : futures)
{
try
{
future.get();
}
catch (InterruptedException ie)
{
SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
}
catch(ExecutionException e)
{
SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
}
}
songLoaderService.shutdown();
使用来自 MainAnalyserService
的函数提交新任务的流程代码public void submit(Callable<Boolean> task) //throws Exception
{
FixSongsController.getFutures().add(getExecutorService().submit(task));
}
它看起来像是在工作,但失败了
java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.base/java.util.ArrayList$Itr.next(Unknown Source)
at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
at java.desktop/javax.swing.SwingWorker.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
我现在松了一口气,我不能让一个线程调用 future.get()(等待完成),同时其他线程正在添加到列表中。
不要shutdown()
ExecutorService
。相反,创建 Callable
个对象并保留它们创建的 Future
个对象。
现在您可以等待 Future
个对象,而不是等待 ExecutorService
个对象。请注意,现在您将不得不分别等待每个未来的对象,但是如果您只需要知道最后一个对象何时完成,那么您也可以按任何给定的顺序迭代它们并调用 get()
.
任何任务都可以提交更多任务,并且需要确保将其未来对象放入主线程将监视的队列中。
// put these somewhere public
ConcurrentLinkedQueue<Future<Boolean>> futures = new ConcurrentLinkedQueue<Future<Boolean>>();
ExecutorService executor = ...
void submit(Callable<Boolean> c) {
futures.add(executor.submit(c));
}
现在您的主线程可以开始提交任务并等待所有任务和子任务:
void mainThread() {
// add some tasks from main thread
for(int i=0 ; i<N ; ++i){
Callable<Boolean> callable = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
...
}
submit(callable);
}
Future<Boolean> head = null;
while((head=futures.poll()) != null){
try {
head.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// At this point, all of your tasks are complete including subtasks.
executor.shutdown();
executor.awaitTermination(); // should return almost immediately
}
我同意 Shloim 的观点,您在这里不需要多个 ExecutorService
实例 - 只需一个(根据您可用的 CPU 数量调整大小)就足够了,而且实际上是最佳的。其实我觉得你可能不需要ExecutorService
;如果您使用信号完整性的外部机制,一个简单的 Executor
就可以完成这项工作。
我会先构建一个 class 来表示整个较大的工作项。如果你需要消费每个子工作项的结果,你可以使用队列,但如果你只想知道是否还有工作要做,你只需要一个计数器。
例如,您可以这样做:
public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private int pendingItems; // guarded by monitor lock on this instance
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public synchronized void enqueueMoreWork(File file) {
pendingItems++;
executor.execute(new FileWork(file, this));
}
public synchronized void markWorkItemCompleted() {
pendingItems--;
notifyAll();
}
public synchronized boolean hasPendingWork() {
return pendingItems > 0;
}
public synchronized void awaitCompletion() {
while (pendingItems > 0) {
wait();
}
}
}
public class FileWork implements Runnable {
private final File file;
private final FolderWork parent;
public FileWork(File file, FolderWork parent) {
this.file = file;
this.parent = parent;
}
@Override
public void run() {
try {
// do some work with the file
if (/* found more work to do */) {
parent.enqueueMoreWork(...);
}
} finally {
parent.markWorkItemCompleted();
}
}
}
如果您担心 pendingItems
计数器的同步开销,您可以改用 AtomicInteger
。然后你需要一个单独的机制来通知等待线程我们已经完成了;例如,您可以使用 CountDownLatch
。这是一个示例实现:
public class FolderWork implements Runnable {
private final Executor executor;
private final File folder;
private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);
public FolderWork(Executor executor, File folder) {
this.executor = executor;
this.folder = folder;
}
@Override
public void run() {
for (File file : folder.listFiles()) {
enqueueMoreWork(file);
}
}
public void enqueueMoreWork(File file) {
if (latch.getCount() == 0) {
throw new IllegalStateException(
"Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
}
pendingItems.incrementAndGet();
executor.execute(new FileWork(file, this));
}
public void markWorkItemCompleted() {
int remainingItems = pendingItems.decrementAndGet();
if (remainingItems == 0) {
latch.countDown();
}
}
public boolean hasPendingWork() {
return pendingItems.get() > 0;
}
public void awaitCompletion() {
latch.await();
}
}
你可以这样称呼它:
Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();
此示例仅显示一级子工作项,但您可以使用任意数量的子工作项,只要它们都使用相同的 pendingItems
计数器来跟踪剩余的工作量做。
这基本上是@DanielPrydens 的解决方案,但我对其进行了一些修改,以便更清楚地说明如何解决我的特定问题
创建了一个新的 class MainAnalyserService 来处理 ExecutorService 的创建并提供在新的可调用任务时进行计数的能力提交并完成时
public class MainAnalyserService
{
public static final int MIN_NUMBER_OF_WORKER_THREADS = 3;
protected static int BOUNDED_QUEUE_SIZE = 100;
private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);
private static final int TIMEOUT_PER_TASK = 30;
protected ExecutorService executorService;
protected String threadGroup;
public MainAnalyserService(String threadGroup)
{
this.threadGroup=threadGroup;
initExecutorService();
}
protected void initExecutorService()
{
int workerSize = Runtime.getRuntime().availableProcessors();
//Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
{
workerSize = MIN_NUMBER_OF_WORKER_THREADS;
}
executorService = new TimeoutThreadPoolExecutor(workerSize,
new SongKongThreadFactory(threadGroup),
new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
TIMEOUT_PER_TASK,
TimeUnit.MINUTES);
}
public void submit(Callable<Boolean> task) //throws Exception
{
executorService.submit(task);
pendingItems.incrementAndGet();
}
public void workDone()
{
int remainingItems = pendingItems.decrementAndGet();
if (remainingItems == 0)
{
latch.countDown();
}
}
public void awaitCompletion() throws InterruptedException{
latch.await();
}
}
在 FixSongsController 线程中我们有
analyserService = new MainAnalyserService(THREAD_WORKER);
//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all initial folder submissions completed
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
songLoaderService.shutdown();
//Wait for all aysnc tasks to complete
analyserService.awaitCompletion();
然后任何 Callable(例如 Process1、Process2 等)调用 submit() 提交新的 Callable 在 ExecutorService 上,然后它必须在完成时调用 workDone(),所以为了确保我这样做,我在 call() 中添加了一个 finally 块每个过程 class 方法
例如
public Boolean call()
{
try
{
//do stuff
//Possibly make multiple calls to
FixSongsController.getAnalyserService().submit();
}
finally
{
FixSongsController.getAnalyserService().workDone();
}
}