ForkJoinPool - 为什么程序抛出 OutOfMemoryError?

ForkJoinPool - Why program is throwing OutOfMemoryError?

我想在 Java 8 中试用 ForkJoinPool,所以我编写了一个小程序来搜索给定目录中名称包含特定关键字的所有文件。

程序:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
                if(mainDirectory.canRead()) {
                    File[] fileList = mainDirectory.listFiles();
                    for(File file : fileList) {
                        System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
                        if(file.isDirectory()) {
                            FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
                            recursiveTasks.add(task);
                            task.fork();
                        } else {
                            if (file.getName().contains("hello")) {
                                System.out.println(file.getName());
                                filetedFileList.add(file.getName());
                            }
                        }
                    }
                }

                for(FileSearchRecursiveTask task : recursiveTasks) {
                  filetedFileList.addAll(task.join());
                }

        }
        return filetedFileList;

    }
}

当目录没有太多的子目录和文件时,这个程序工作正常,但如果它真的很大,那么它会抛出 OutOfMemoryError。

我的理解是最大线程数(包括补偿线程)是有限制的,所以为什么会出现这个错误?我的程序中是否缺少任何内容?

Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)   

只需稍作改动。 您需要为 newWorkStealingPool 指定并行度,如下所示:

ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(5);

根据其文档:

newWorkStealingPool(int parallelism) -> Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention. The parallelism level corresponds to the maximum number of threads actively engaged in, or available to engage in, task processing. The actual number of threads may grow and shrink dynamically. A work-stealing pool makes no guarantees about the order in which submitted tasks are executed.

根据随附的 Java Visual VM 屏幕截图,这种并行性允许程序在指定的内存内工作,并且永远不会超出内存。

还有一件事(不知道会不会有效果):

更改调用 fork 和将任务添加到列表的顺序。即改变

FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
recursiveTasks.add(task);
task.fork();

FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
task.fork();
recursiveTasks.add(task);

你不应该分叉出面目全非的新任务。基本上,只要另一个工作线程有机会接管分叉作业并在本地进行评估,您就应该进行分叉。然后,一旦你分叉了一个任务,就不要马上调用 join() 。虽然底层框架将启动补偿线程以确保您的作业将继续进行,而不是让所有线程都被阻塞以等待子任务,但这将创建可能超出系统能力的大量线程。

这是您的代码的修订版:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR"));
        List<String> files = task.invoke();
        System.out.println("Total no of files with hello " + files.size());
    }

}

class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
    private static final int TARGET_SURPLUS = 3;
    private File path;
    public FileSearchRecursiveTask(File file) {
        this.path = file;
    }

    @Override
    protected List<String> compute() {
        File directory = path;
        if(directory.isDirectory() && directory.canRead()) {
            System.out.println(Thread.currentThread() + " - Directory is " + directory.getName());
            return scan(directory);
        }
        return Collections.emptyList();
    }

    private List<String> scan(File directory)
    {
        File[] fileList = directory.listFiles();
        if(fileList == null || fileList.length == 0) return Collections.emptyList();
        List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
        List<String> filteredFileList = new ArrayList<>();
        for(File file: fileList) {
            System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
            if(file.isDirectory())
            {
                if(getSurplusQueuedTaskCount() < TARGET_SURPLUS)
                {
                    FileSearchRecursiveTask task = new FileSearchRecursiveTask(file);
                    recursiveTasks.add(task);
                    task.fork();
                }
                else filteredFileList.addAll(scan(file));
            }
            else if(file.getName().contains("hello")) {
                filteredFileList.add(file.getAbsolutePath());
            }
        }

        for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
            FileSearchRecursiveTask task = recursiveTasks.get(ix);
            if(task.tryUnfork()) task.complete(scan(task.path));
        }

        for(FileSearchRecursiveTask task: recursiveTasks) {
            filteredFileList.addAll(task.join());
        }
        return filteredFileList;
    }
}

执行处理的方法已分解为接收目录作为参数的方法,因此我们可以在本地将其用于任意目录,不一定与 FileSearchRecursiveTask 实例相关联。

然后,该方法使用getSurplusQueuedTaskCount() 来确定尚未被其他工作线程拾取的本地排队任务的数量。确保有一些有助于工作平衡。但如果这个数字超过阈值,处理将在本地完成,而不会分叉更多作业。

本地处理后,迭代任务并使用tryUnfork()识别未被其他工作线程窃取的作业并在本地处理它们。向后迭代以从最年轻的工作开始这会增加找到一些工作的机会。

仅在之后,它 join() 包含所有子作业,这些子作业现在已经完成或正在由另一个工作线程处理。

请注意,我更改了启动代码以使用默认池。这使用“CPU 核心数”减去一个工作线程,加上启动线程,即本例中的 main 线程。