多线程帮助在 Java 中使用 ExecutorService

Multithreading help using ExecutorService in Java

我正在尝试搜索单词列表并查找多个文件中所有单词的总数。

我的逻辑是为每个文件设置单独的线程并获取计数。最后我可以汇总从每个线程获得的总数。

比如说,我有 50 个文件,每个 1MB。当我使用多线程时,性能没有提高。 FILE_THREAD_COUNT 我的总执行时间没有改善。当线程数为 1 或 50 时,我的执行时间几乎相同。

我是不是在使用执行器服务时做错了什么?

这是我的代码。

public void searchText(List<File> filesInPath, Set<String> searchWords) {
    try {
        BlockingQueue<File> filesBlockingQueue = new ArrayBlockingQueue<>(filesInPath.size());
        filesBlockingQueue.addAll(filesInPath);

        ExecutorService executorService = Executors.newFixedThreadPool(FILE_THREAD_COUNT);
        int totalWordCount = 0;
        while (!filesBlockingQueue.isEmpty()) {
            Callable<Integer> task = () -> {
                int wordCount = 0;
                try {
                    File file = filesBlockingQueue.take();
                    try (BufferedReader bufferedReader = new BufferedReader(new FileReader(file))) {
                        String currentLine;
                        while ((currentLine = bufferedReader.readLine()) != null) {
                            String[] words = currentLine.split("\s+");
                            for (String word : words) {
                                for (String searchWord : searchWords) {
                                    if (word.contains(searchWord)) {
                                        wordCount++;
                                    }
                                }
                            }
                        }
                    } catch (Exception e) {
                        // Handle error
                    }
                } catch (Exception e) {
                    // Handle error
                }
                return wordCount;
            };
            totalWordCount += executorService.submit(task).get();
        }
        System.out.println("Final word count=" + totalWordCount);
        executorService.shutdown();
    } catch (Exception e) {
        // Handle error
    }
}

是的,你做错了什么。

问题出在这里:

executorService.submit(task).get()

您的代码提交一个任务然后等待它完成,这并没有实现任何并行;任务 运行 按顺序进行。而你的 BlockingQueue 没有任何价值。

并行运行个任务的方法是提交所有个任务,收集Future s 返回,然后 对所有这些调用 get()。像这样:

List<Future<Integer>> futures = filesInPath.stream()
    .map(<create your Callable>)
    .map(executorService::submit)
    .collect(toList());

for (Future future : futures)
    totalWordCount += future.get();
}

你实际上可以在一个流中完成它,通过中间列表(如上所述)然后立即流式传输,但是你必须在一些代码中包装对 Future#get 的调用以捕获检查的例外 - 我将其留作 reader.

的练习