由于限制,灵活的 CountDownLatch 无法使用 Phaser

Flexible CountDownLatch can't use Phaser because of limit

我收到一个大文件,有 N 个条目。对于每个条目,我正在创建一个新线程。我需要等待所有N个线程被终止。

一开始我使用的是 Phaser,但它的实现仅限于 65K 方。所以,正在爆炸,因为 N 可能像 100K。

然后,我尝试了 CountDownLatch。这很好用,非常简单的概念和非常简单的实现。但是我不知道N个数

Phaser 是我的解决方案,但它有这个限制。

有什么想法吗?

这个post是相关的: Flexible CountDownLatch?

使用 AtomicInteger,您可以轻松实现相同的目的。用 1 初始化并随着每个新线程递增。一旦在 worker 和 producer 中完成,递减并获取。如果为零运行你整理Runnable.

听起来您要解决的问题是尽快处理大量任务并等待处理完成。

同时处理大量任务的问题在于,它可能会导致过多的上下文切换,并且会实质上削弱您的机器并使处理速度降低到一定数量(取决于硬件)的并发线程之上。这意味着您需要对正在执行的并发工作线程设置上限。

Phaser 和 CountDownLatch 都是同步原语,它们的目的是提供对关键代码块的访问控制,而不是管理并行执行。

我会使用 Executor service in this case. It supports the addition of tasks (in many forms, including Runnable).

您可以使用 Executors class. I'd recommend using a fixed size thread pool 轻松创建一个 ExecutorService,最大线程数为 20-100 - 这取决于您的任务 CPU 密集程度。任务所需的计算能力越强,在不严重降低性能的情况下可以处理的并行线程数就越少。

有多种方法可以等待所有任务完成:

  • 收集 submit 方法返回的所有 Future 个实例,然后简单地对所有实例调用 get。这确保每个任务都在循环结束时执行。
  • Shut down the executor service and wait for all the submitted tasks to finish。这种方法的缺点是您必须指定等待任务完成的最长时间。此外,它不太优雅,您并不总是想关闭 Executor,这取决于您是在编写单次应用程序还是在之后保留 运行 的服务器 - 以防万一服务器应用程序,您肯定必须使用以前的方法。

最后,这是一个说明所有这些的代码片段:

List<TaskFromFile> tasks = loadFileAndCreateTasks();
ExecutorService executor = Executors.newFixedThreadPool(50);

for(TaskFromFile task : tasks) {
    // createRunnable is not necessary in case your task implements Runnable
    executor.submit(createRunnable(task));
}

// assuming single-shot batch job
executor.shutdown();
executor.awaitTermination(MAX_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);

ReusableCountLatch 是一个 CountDownLatch 替代方案,它也允许递增。

用法如下:

ReusableCountLatch latch = new ReusableCountLatch(); // creates latch with initial count 0
ReusableCountLatch latch = new ReusableCountLatch(10); // creates latch with initial count 10

latch.increment(); // increments counter

latch.decrement(); // decrement counter

latch.waitTillZero(); // blocks until counts falls to zero

boolean succeeded = latch.waitTillZero(200, MILLISECONDS); // waits for up to 200 milliseconds until count falls to zero

int count = latch.getCount(); // gets actual count

要使用它,只需将此 gradle/maven 依赖项添加到您的项目中:'com.github.matejtymes:javafixes:1.3.1'

可以在此处找到更多详细信息:https://github.com/MatejTymes/JavaFixes