多线程帮助在 Java 中使用 ExecutorService
Multithreading help using ExecutorService in Java
我正在尝试搜索单词列表并查找多个文件中所有单词的总数。
我的逻辑是为每个文件设置单独的线程并获取计数。最后我可以汇总从每个线程获得的总数。
比如说,我有 50 个文件,每个 1MB。当我使用多线程时,性能没有提高。 FILE_THREAD_COUNT
我的总执行时间没有改善。当线程数为 1 或 50 时,我的执行时间几乎相同。
我是不是在使用执行器服务时做错了什么?
这是我的代码。
public void searchText(List<File> filesInPath, Set<String> searchWords) {
try {
BlockingQueue<File> filesBlockingQueue = new ArrayBlockingQueue<>(filesInPath.size());
filesBlockingQueue.addAll(filesInPath);
ExecutorService executorService = Executors.newFixedThreadPool(FILE_THREAD_COUNT);
int totalWordCount = 0;
while (!filesBlockingQueue.isEmpty()) {
Callable<Integer> task = () -> {
int wordCount = 0;
try {
File file = filesBlockingQueue.take();
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(file))) {
String currentLine;
while ((currentLine = bufferedReader.readLine()) != null) {
String[] words = currentLine.split("\s+");
for (String word : words) {
for (String searchWord : searchWords) {
if (word.contains(searchWord)) {
wordCount++;
}
}
}
}
} catch (Exception e) {
// Handle error
}
} catch (Exception e) {
// Handle error
}
return wordCount;
};
totalWordCount += executorService.submit(task).get();
}
System.out.println("Final word count=" + totalWordCount);
executorService.shutdown();
} catch (Exception e) {
// Handle error
}
}
是的,你做错了什么。
问题出在这里:
executorService.submit(task).get()
您的代码提交一个任务然后等待它完成,这并没有实现任何并行;任务 运行 按顺序进行。而你的 BlockingQueue
没有任何价值。
并行运行个任务的方法是先提交所有个任务,收集Future
s 返回,然后 对所有这些调用 get()
。像这样:
List<Future<Integer>> futures = filesInPath.stream()
.map(<create your Callable>)
.map(executorService::submit)
.collect(toList());
for (Future future : futures)
totalWordCount += future.get();
}
你实际上可以在一个流中完成它,通过中间列表(如上所述)然后立即流式传输,但是你必须在一些代码中包装对 Future#get
的调用以捕获检查的例外 - 我将其留作 reader.
的练习
我正在尝试搜索单词列表并查找多个文件中所有单词的总数。
我的逻辑是为每个文件设置单独的线程并获取计数。最后我可以汇总从每个线程获得的总数。
比如说,我有 50 个文件,每个 1MB。当我使用多线程时,性能没有提高。 FILE_THREAD_COUNT
我的总执行时间没有改善。当线程数为 1 或 50 时,我的执行时间几乎相同。
我是不是在使用执行器服务时做错了什么?
这是我的代码。
public void searchText(List<File> filesInPath, Set<String> searchWords) {
try {
BlockingQueue<File> filesBlockingQueue = new ArrayBlockingQueue<>(filesInPath.size());
filesBlockingQueue.addAll(filesInPath);
ExecutorService executorService = Executors.newFixedThreadPool(FILE_THREAD_COUNT);
int totalWordCount = 0;
while (!filesBlockingQueue.isEmpty()) {
Callable<Integer> task = () -> {
int wordCount = 0;
try {
File file = filesBlockingQueue.take();
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(file))) {
String currentLine;
while ((currentLine = bufferedReader.readLine()) != null) {
String[] words = currentLine.split("\s+");
for (String word : words) {
for (String searchWord : searchWords) {
if (word.contains(searchWord)) {
wordCount++;
}
}
}
}
} catch (Exception e) {
// Handle error
}
} catch (Exception e) {
// Handle error
}
return wordCount;
};
totalWordCount += executorService.submit(task).get();
}
System.out.println("Final word count=" + totalWordCount);
executorService.shutdown();
} catch (Exception e) {
// Handle error
}
}
是的,你做错了什么。
问题出在这里:
executorService.submit(task).get()
您的代码提交一个任务然后等待它完成,这并没有实现任何并行;任务 运行 按顺序进行。而你的 BlockingQueue
没有任何价值。
并行运行个任务的方法是先提交所有个任务,收集Future
s 返回,然后 对所有这些调用 get()
。像这样:
List<Future<Integer>> futures = filesInPath.stream()
.map(<create your Callable>)
.map(executorService::submit)
.collect(toList());
for (Future future : futures)
totalWordCount += future.get();
}
你实际上可以在一个流中完成它,通过中间列表(如上所述)然后立即流式传输,但是你必须在一些代码中包装对 Future#get
的调用以捕获检查的例外 - 我将其留作 reader.