使用 ConcurrentHashMap 处理数据不一致
Data inconsistency using ConcurrentHashMap
对于同一组文件,每个 运行 的计数都会发生变化。
下面的代码仍然数据不一致。如何保证线程安全?简单的字数统计代码。
package ConcurrentHashMapDemo;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
class FileReaderTask implements Runnable {
private String filePath;
private String fileName;
private ConcurrentMap<String, Integer> wordCountMap;
public FileReaderTask(String filePath, String fileName,
ConcurrentMap<String, Integer> wordCountMap) {
this.filePath = filePath;
this.fileName = fileName;
this.wordCountMap = wordCountMap;
}
public void run() {
File jobFile = new File(filePath + fileName);
try {
BufferedReader bReader = new BufferedReader(new FileReader(jobFile));
String line = "";
while ((line = bReader.readLine()) != null) {
String[] strArray = line.split(" ");
for (String str : strArray) {
if (wordCountMap.containsKey(str)) {
wordCountMap.replace (str.trim(),
wordCountMap.get(str.trim()) + 1);
} else {
wordCountMap.putIfAbsent(str.trim(), 1);
}
}
}
//Thread.sleep(10000);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
ConcurrentMap<String, Integer> wordCountMap = new ConcurrentHashMap<String, Integer>();
File fileDir = new File("c://job_files");
Thread[] threads = new Thread[fileDir.listFiles().length];
for(int i=0;i<threads.length;i++){
FileReaderTask frt = new FileReaderTask("c:/job_files/", fileDir.listFiles()[i].getName(), wordCountMap);
threads[i]= new Thread(frt);
threads[i].start();
}
//
for(int i=0;i<threads.length;i++){
try {
threads[i].join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for(Map.Entry<String, Integer> entry: wordCountMap.entrySet()){
String key = entry.getKey();
System.out.println(key +" - - "+wordCountMap.get(key));
}
System.out.println("Main");
}
}
并发容器确保内部一致性(例如,不会两次添加相同的键),但它们不会保护存储的值。您的代码现在具有竞争条件。另一个线程可以在您调用 get
和调用 replace
之间递增计数器。 replace
然后将错误的值放入映射中,丢失了另一个线程执行的增量。
你需要让你的增量原子化。像这样的东西,它使用 replace
的版本,确保映射中的值在执行替换之前仍然相同:
str = str.trim();
while(true) {
Integer oldValue = wordCountMap.putIfAbsent(str, 1);
if(oldValue != null) {
if(wordCountMap.replace(str, oldValue, oldValue + 1))
break; // Successfully incremented the existing count
} else {
break; // Added new count of 1
}
}
对于同一组文件,每个 运行 的计数都会发生变化。 下面的代码仍然数据不一致。如何保证线程安全?简单的字数统计代码。
package ConcurrentHashMapDemo;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
class FileReaderTask implements Runnable {
private String filePath;
private String fileName;
private ConcurrentMap<String, Integer> wordCountMap;
public FileReaderTask(String filePath, String fileName,
ConcurrentMap<String, Integer> wordCountMap) {
this.filePath = filePath;
this.fileName = fileName;
this.wordCountMap = wordCountMap;
}
public void run() {
File jobFile = new File(filePath + fileName);
try {
BufferedReader bReader = new BufferedReader(new FileReader(jobFile));
String line = "";
while ((line = bReader.readLine()) != null) {
String[] strArray = line.split(" ");
for (String str : strArray) {
if (wordCountMap.containsKey(str)) {
wordCountMap.replace (str.trim(),
wordCountMap.get(str.trim()) + 1);
} else {
wordCountMap.putIfAbsent(str.trim(), 1);
}
}
}
//Thread.sleep(10000);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
ConcurrentMap<String, Integer> wordCountMap = new ConcurrentHashMap<String, Integer>();
File fileDir = new File("c://job_files");
Thread[] threads = new Thread[fileDir.listFiles().length];
for(int i=0;i<threads.length;i++){
FileReaderTask frt = new FileReaderTask("c:/job_files/", fileDir.listFiles()[i].getName(), wordCountMap);
threads[i]= new Thread(frt);
threads[i].start();
}
//
for(int i=0;i<threads.length;i++){
try {
threads[i].join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for(Map.Entry<String, Integer> entry: wordCountMap.entrySet()){
String key = entry.getKey();
System.out.println(key +" - - "+wordCountMap.get(key));
}
System.out.println("Main");
}
}
并发容器确保内部一致性(例如,不会两次添加相同的键),但它们不会保护存储的值。您的代码现在具有竞争条件。另一个线程可以在您调用 get
和调用 replace
之间递增计数器。 replace
然后将错误的值放入映射中,丢失了另一个线程执行的增量。
你需要让你的增量原子化。像这样的东西,它使用 replace
的版本,确保映射中的值在执行替换之前仍然相同:
str = str.trim();
while(true) {
Integer oldValue = wordCountMap.putIfAbsent(str, 1);
if(oldValue != null) {
if(wordCountMap.replace(str, oldValue, oldValue + 1))
break; // Successfully incremented the existing count
} else {
break; // Added new count of 1
}
}