为什么在使用多线程统计大文件的词频时答案会出现变化?
Why there is variation in answer while using multiple threads to count word frequencies of a large file?
我的objective是在使用多线程读取大文件的同时统计每个词出现的频率。
我正在实现 Runnable 接口来实现多线程。但是在执行程序时,我并不是每次都得到正确的答案。有时,它会给出正确的输出,有时则不会。但是使用Callable接口而不是Runnable,程序正确执行没有任何错误。
这是主要的class:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WordFrequencyRunnableTest {
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();
String filePath = "C:/Users/Mukesh Kumar/Desktop/data.txt";
WordFrequencyRunnableTest runnableTest = new WordFrequencyRunnableTest();
Map<String, Integer> wordFrequencies = runnableTest.parseLines(filePath);
runnableTest.printResult(wordFrequencies);
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("Total execution time in millis: " + elapsedTime);
}
public Map<String, Integer> parseLines(String filePath) throws IOException {
Map<String, Integer> wordFrequencies = new HashMap<>();
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath))) {
String eachLine = bufferedReader.readLine();
while (eachLine != null) {
List<String> linesForEachThread = new ArrayList<>();
while (linesForEachThread.size() != 100 && eachLine != null) {
linesForEachThread.add(eachLine);
eachLine = bufferedReader.readLine();
}
WordFrequencyUsingRunnable task = new WordFrequencyUsingRunnable(linesForEachThread, wordFrequencies);
Thread thread = new Thread(task);
thread.start();
}
}
return wordFrequencies;
}
public void printResult(Map<String, Integer> wordFrequencies) {
wordFrequencies.forEach((key, value) -> System.out.println(key + " " + value));
}
}
这就是逻辑class:
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class WordFrequencyUsingRunnable implements Runnable {
private final List<String> linesForEachThread;
private final Map<String, Integer> wordFrequencies;
public WordFrequencyUsingRunnable(List<String> linesForEachThread, Map<String, Integer> wordFrequencies) {
this.linesForEachThread = linesForEachThread;
this.wordFrequencies = wordFrequencies;
}
@Override
public void run() {
List<String> currentThreadLines = new ArrayList<>(linesForEachThread);
for (String eachLine : currentThreadLines) {
String[] eachLineWords = eachLine.toLowerCase().split("([,.\s]+)");
synchronized (wordFrequencies) {
for (String eachWord : eachLineWords) {
if (wordFrequencies.containsKey(eachWord)) {
wordFrequencies.replace(eachWord, wordFrequencies.get(eachWord) + 1);
}
wordFrequencies.putIfAbsent(eachWord, 1);
}
}
}
}
}
我希望得到好的回应,并在此先感谢您的帮助。
您应该等待所有线程关闭后再打印结果。
public class WordFrequencyRunnableTest {
List<Thread> threads = new ArrayList<>();
public static void main(String[] args) throws IOException {
...
...
Map<String, Integer> wordFrequencies = runnableTest.parseLines(filePath);
for(Thread thread: threads)
{
thread.join();
}
runnableTest.printResult(wordFrequencies);
...
...
}
public Map<String, Integer> parseLines(String filePath) throws IOException {
Map<String, Integer> wordFrequencies = new HashMap<>();
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath))) {
String eachLine = bufferedReader.readLine();
while (eachLine != null) {
List<String> linesForEachThread = new ArrayList<>();
while (linesForEachThread.size() != 100 && eachLine != null) {
linesForEachThread.add(eachLine);
eachLine = bufferedReader.readLine();
}
WordFrequencyUsingRunnable task = new WordFrequencyUsingRunnable(linesForEachThread, wordFrequencies);
Thread thread = new Thread(task);
thread.start();
threads.add(thread); // Add thread to the list.
}
}
return wordFrequencies;
}
}
PS - 您可以使用 ConcurrentHashMap<String, AtomicInteger>
来避免必须同步对 hashmap 的访问。这样程序会 运行 更快。
我的objective是在使用多线程读取大文件的同时统计每个词出现的频率。 我正在实现 Runnable 接口来实现多线程。但是在执行程序时,我并不是每次都得到正确的答案。有时,它会给出正确的输出,有时则不会。但是使用Callable接口而不是Runnable,程序正确执行没有任何错误。
这是主要的class:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WordFrequencyRunnableTest {
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();
String filePath = "C:/Users/Mukesh Kumar/Desktop/data.txt";
WordFrequencyRunnableTest runnableTest = new WordFrequencyRunnableTest();
Map<String, Integer> wordFrequencies = runnableTest.parseLines(filePath);
runnableTest.printResult(wordFrequencies);
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("Total execution time in millis: " + elapsedTime);
}
public Map<String, Integer> parseLines(String filePath) throws IOException {
Map<String, Integer> wordFrequencies = new HashMap<>();
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath))) {
String eachLine = bufferedReader.readLine();
while (eachLine != null) {
List<String> linesForEachThread = new ArrayList<>();
while (linesForEachThread.size() != 100 && eachLine != null) {
linesForEachThread.add(eachLine);
eachLine = bufferedReader.readLine();
}
WordFrequencyUsingRunnable task = new WordFrequencyUsingRunnable(linesForEachThread, wordFrequencies);
Thread thread = new Thread(task);
thread.start();
}
}
return wordFrequencies;
}
public void printResult(Map<String, Integer> wordFrequencies) {
wordFrequencies.forEach((key, value) -> System.out.println(key + " " + value));
}
}
这就是逻辑class:
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class WordFrequencyUsingRunnable implements Runnable {
private final List<String> linesForEachThread;
private final Map<String, Integer> wordFrequencies;
public WordFrequencyUsingRunnable(List<String> linesForEachThread, Map<String, Integer> wordFrequencies) {
this.linesForEachThread = linesForEachThread;
this.wordFrequencies = wordFrequencies;
}
@Override
public void run() {
List<String> currentThreadLines = new ArrayList<>(linesForEachThread);
for (String eachLine : currentThreadLines) {
String[] eachLineWords = eachLine.toLowerCase().split("([,.\s]+)");
synchronized (wordFrequencies) {
for (String eachWord : eachLineWords) {
if (wordFrequencies.containsKey(eachWord)) {
wordFrequencies.replace(eachWord, wordFrequencies.get(eachWord) + 1);
}
wordFrequencies.putIfAbsent(eachWord, 1);
}
}
}
}
}
我希望得到好的回应,并在此先感谢您的帮助。
您应该等待所有线程关闭后再打印结果。
public class WordFrequencyRunnableTest {
List<Thread> threads = new ArrayList<>();
public static void main(String[] args) throws IOException {
...
...
Map<String, Integer> wordFrequencies = runnableTest.parseLines(filePath);
for(Thread thread: threads)
{
thread.join();
}
runnableTest.printResult(wordFrequencies);
...
...
}
public Map<String, Integer> parseLines(String filePath) throws IOException {
Map<String, Integer> wordFrequencies = new HashMap<>();
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath))) {
String eachLine = bufferedReader.readLine();
while (eachLine != null) {
List<String> linesForEachThread = new ArrayList<>();
while (linesForEachThread.size() != 100 && eachLine != null) {
linesForEachThread.add(eachLine);
eachLine = bufferedReader.readLine();
}
WordFrequencyUsingRunnable task = new WordFrequencyUsingRunnable(linesForEachThread, wordFrequencies);
Thread thread = new Thread(task);
thread.start();
threads.add(thread); // Add thread to the list.
}
}
return wordFrequencies;
}
}
PS - 您可以使用 ConcurrentHashMap<String, AtomicInteger>
来避免必须同步对 hashmap 的访问。这样程序会 运行 更快。