如何配置单线程ForkJoinPool?

How to configure a single-threaded ForkJoinPool?

是否可以配置 ForkJoinPool 使用 1 个执行线程?

我正在执行在 ForkJoinPool 中调用 Random 的代码。每次运行时,我都会遇到不同的运行时行为,这使得调查回归变得困难。

我希望代码库提供 "debug" 和 "release" 模式。 "debug" 模式将使用固定种子配置 Random,并使用单个执行线程配置 ForkJoinPool。 "release" 模式将使用系统提供的 Random 种子并使用默认数量的 ForkJoinPool 线程。

我尝试将 ForkJoinPool 配置为并行度 1,但它使用 2 个线程(main 和第二个工作线程)。有什么想法吗?

主线程始终是您的应用程序将创建的第一个线程。因此,当您创建 ForkJoinPoolparallelism1 时,您正在创建另一个线程。实际上,现在应用程序中将有两个线程(因为您创建了 pool 个线程)。

如果您只需要一个主线程,您可以按顺序执行代码(而不是完全并行执行)。

原来我错了

当您配置 ForkJoinPool 并将 parallelism 设置为 1 时,只有一个线程执行任务。 main 线程在 ForkJoin.get() 上被阻塞。它实际上不执行任何任务。

也就是说,事实证明提供确定性行为确实很棘手。以下是我必须纠正的一些问题:

  • ForkJoinPool 正在使用不同的工作线程(具有不同的名称)执行任务,前提是工作线程空闲时间足够长。例如,如果主线程在调试断点处暂停,工作线程将变为空闲并关闭。当我恢复执行时,ForkJoinThread 将启动一个具有不同名称的新工作线程。为了解决这个问题,我不得不 provide a custom ForkJoinWorkerThreadFactory implementation that returns null if the ForkJoinPool already has a live worker (这可以防止池创建多个工人)。我还确保我的代码返回相同的 Random 实例,即使工作线程关闭并再次返回也是如此。
  • 具有 non-deterministic 迭代顺序的集合,例如 HashMapHashSet 导致元素在每个 运行 上以不同的顺序获取随机数。我使用 LinkedHashMapLinkedHashSet.
  • 更正了这个问题
  • 具有 non-deterministic hashCode() 实现的对象,例如 Enum.hashCode()。我忘记了这导致了什么问题,但我通过自己计算 hashCode() 而不是依赖 built-in 方法来纠正它。

这是 ForkJoinWorkerThreadFactory 的示例实现:

class MyForkJoinWorkerThread extends ForkJoinWorkerThread
{
    MyForkJoinWorkerThread(ForkJoinPool pool)
    {
        super(pool);
        // Change thread name after ForkJoinPool.registerWorker() does the same
        setName("DETERMINISTIC_WORKER");
    }
}

ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory()
{
    private WeakReference<Thread> currentWorker = new WeakReference<>(null);

    @Override
    public synchronized ForkJoinWorkerThread newThread(ForkJoinPool pool)
    {
        // If the pool already has a live thread, wait for it to shut down.
        Thread thread = currentWorker.get();
        if (thread != null && thread.isAlive())
        {
            try
            {
                thread.join();
            }
            catch (InterruptedException e)
            {
                log.error("", e);
            }
        }
        ForkJoinWorkerThread result = new MyForkJoinWorkerThread(pool);
        currentWorker = new WeakReference<>(result);
        return result;
    }
};