由于限制,灵活的 CountDownLatch 无法使用 Phaser
Flexible CountDownLatch can't use Phaser because of limit
我收到一个大文件,有 N 个条目。对于每个条目,我正在创建一个新线程。我需要等待所有N个线程被终止。
一开始我使用的是 Phaser,但它的实现仅限于 65K 方。所以,正在爆炸,因为 N 可能像 100K。
然后,我尝试了 CountDownLatch。这很好用,非常简单的概念和非常简单的实现。但是我不知道N个数
Phaser 是我的解决方案,但它有这个限制。
有什么想法吗?
这个post是相关的:
Flexible CountDownLatch?
使用 AtomicInteger,您可以轻松实现相同的目的。用 1 初始化并随着每个新线程递增。一旦在 worker 和 producer 中完成,递减并获取。如果为零运行你整理Runnable.
听起来您要解决的问题是尽快处理大量任务并等待处理完成。
同时处理大量任务的问题在于,它可能会导致过多的上下文切换,并且会实质上削弱您的机器并使处理速度降低到一定数量(取决于硬件)的并发线程之上。这意味着您需要对正在执行的并发工作线程设置上限。
Phaser 和 CountDownLatch 都是同步原语,它们的目的是提供对关键代码块的访问控制,而不是管理并行执行。
我会使用 Executor service in this case. It supports the addition of tasks (in many forms, including Runnable).
您可以使用 Executors class. I'd recommend using a fixed size thread pool 轻松创建一个 ExecutorService
,最大线程数为 20-100 - 这取决于您的任务 CPU 密集程度。任务所需的计算能力越强,在不严重降低性能的情况下可以处理的并行线程数就越少。
有多种方法可以等待所有任务完成:
- 收集
submit
方法返回的所有 Future
个实例,然后简单地对所有实例调用 get。这确保每个任务都在循环结束时执行。
- Shut down the executor service and wait for all the submitted tasks to finish。这种方法的缺点是您必须指定等待任务完成的最长时间。此外,它不太优雅,您并不总是想关闭
Executor
,这取决于您是在编写单次应用程序还是在之后保留 运行 的服务器 - 以防万一服务器应用程序,您肯定必须使用以前的方法。
最后,这是一个说明所有这些的代码片段:
List<TaskFromFile> tasks = loadFileAndCreateTasks();
ExecutorService executor = Executors.newFixedThreadPool(50);
for(TaskFromFile task : tasks) {
// createRunnable is not necessary in case your task implements Runnable
executor.submit(createRunnable(task));
}
// assuming single-shot batch job
executor.shutdown();
executor.awaitTermination(MAX_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
ReusableCountLatch
是一个 CountDownLatch
替代方案,它也允许递增。
用法如下:
ReusableCountLatch latch = new ReusableCountLatch(); // creates latch with initial count 0
ReusableCountLatch latch = new ReusableCountLatch(10); // creates latch with initial count 10
latch.increment(); // increments counter
latch.decrement(); // decrement counter
latch.waitTillZero(); // blocks until counts falls to zero
boolean succeeded = latch.waitTillZero(200, MILLISECONDS); // waits for up to 200 milliseconds until count falls to zero
int count = latch.getCount(); // gets actual count
要使用它,只需将此 gradle/maven 依赖项添加到您的项目中:'com.github.matejtymes:javafixes:1.3.1'
可以在此处找到更多详细信息:https://github.com/MatejTymes/JavaFixes
我收到一个大文件,有 N 个条目。对于每个条目,我正在创建一个新线程。我需要等待所有N个线程被终止。
一开始我使用的是 Phaser,但它的实现仅限于 65K 方。所以,正在爆炸,因为 N 可能像 100K。
然后,我尝试了 CountDownLatch。这很好用,非常简单的概念和非常简单的实现。但是我不知道N个数
Phaser 是我的解决方案,但它有这个限制。
有什么想法吗?
这个post是相关的: Flexible CountDownLatch?
使用 AtomicInteger,您可以轻松实现相同的目的。用 1 初始化并随着每个新线程递增。一旦在 worker 和 producer 中完成,递减并获取。如果为零运行你整理Runnable.
听起来您要解决的问题是尽快处理大量任务并等待处理完成。
同时处理大量任务的问题在于,它可能会导致过多的上下文切换,并且会实质上削弱您的机器并使处理速度降低到一定数量(取决于硬件)的并发线程之上。这意味着您需要对正在执行的并发工作线程设置上限。
Phaser 和 CountDownLatch 都是同步原语,它们的目的是提供对关键代码块的访问控制,而不是管理并行执行。
我会使用 Executor service in this case. It supports the addition of tasks (in many forms, including Runnable).
您可以使用 Executors class. I'd recommend using a fixed size thread pool 轻松创建一个 ExecutorService
,最大线程数为 20-100 - 这取决于您的任务 CPU 密集程度。任务所需的计算能力越强,在不严重降低性能的情况下可以处理的并行线程数就越少。
有多种方法可以等待所有任务完成:
- 收集
submit
方法返回的所有Future
个实例,然后简单地对所有实例调用 get。这确保每个任务都在循环结束时执行。 - Shut down the executor service and wait for all the submitted tasks to finish。这种方法的缺点是您必须指定等待任务完成的最长时间。此外,它不太优雅,您并不总是想关闭
Executor
,这取决于您是在编写单次应用程序还是在之后保留 运行 的服务器 - 以防万一服务器应用程序,您肯定必须使用以前的方法。
最后,这是一个说明所有这些的代码片段:
List<TaskFromFile> tasks = loadFileAndCreateTasks();
ExecutorService executor = Executors.newFixedThreadPool(50);
for(TaskFromFile task : tasks) {
// createRunnable is not necessary in case your task implements Runnable
executor.submit(createRunnable(task));
}
// assuming single-shot batch job
executor.shutdown();
executor.awaitTermination(MAX_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
ReusableCountLatch
是一个 CountDownLatch
替代方案,它也允许递增。
用法如下:
ReusableCountLatch latch = new ReusableCountLatch(); // creates latch with initial count 0
ReusableCountLatch latch = new ReusableCountLatch(10); // creates latch with initial count 10
latch.increment(); // increments counter
latch.decrement(); // decrement counter
latch.waitTillZero(); // blocks until counts falls to zero
boolean succeeded = latch.waitTillZero(200, MILLISECONDS); // waits for up to 200 milliseconds until count falls to zero
int count = latch.getCount(); // gets actual count
要使用它,只需将此 gradle/maven 依赖项添加到您的项目中:'com.github.matejtymes:javafixes:1.3.1'
可以在此处找到更多详细信息:https://github.com/MatejTymes/JavaFixes