使用 ConcurrentHashMap 处理数据不一致

Data inconsistency using ConcurrentHashMap

对于同一组文件,每个 运行 的计数都会发生变化。 下面的代码仍然数据不一致。如何保证线程安全?简单的字数统计代码。

package ConcurrentHashMapDemo;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

class FileReaderTask implements Runnable {
    private String filePath;
    private String fileName;
    private ConcurrentMap<String, Integer> wordCountMap;

    public FileReaderTask(String filePath, String fileName,
            ConcurrentMap<String, Integer> wordCountMap) {
        this.filePath = filePath;
        this.fileName = fileName;
        this.wordCountMap = wordCountMap;
    }

    public void run() {
        File jobFile = new File(filePath + fileName);
        try {
            BufferedReader bReader = new BufferedReader(new FileReader(jobFile));
            String line = "";
            while ((line = bReader.readLine()) != null) {
                String[] strArray = line.split(" ");
                for (String str : strArray) {
                    if (wordCountMap.containsKey(str)) {
                        wordCountMap.replace (str.trim(),
                                wordCountMap.get(str.trim()) + 1);
                    } else {
                        wordCountMap.putIfAbsent(str.trim(), 1);
                    }
                }
            }
            //Thread.sleep(10000);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public class Main {
    public static void main(String[] args) {
        ConcurrentMap<String, Integer> wordCountMap = new ConcurrentHashMap<String, Integer>();
        File fileDir = new File("c://job_files");
        Thread[] threads = new Thread[fileDir.listFiles().length];
        for(int i=0;i<threads.length;i++){
            FileReaderTask frt = new FileReaderTask("c:/job_files/", fileDir.listFiles()[i].getName(), wordCountMap);
            threads[i]= new Thread(frt);
            threads[i].start();
        }
        //
        for(int i=0;i<threads.length;i++){
        try {
        threads[i].join();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        }

        for(Map.Entry<String, Integer> entry: wordCountMap.entrySet()){
            String key = entry.getKey();
            System.out.println(key +" - - "+wordCountMap.get(key));
        }
        System.out.println("Main");
    }
}

并发容器确保内部一致性(例如,不会两次添加相同的键),但它们不会保护存储的值。您的代码现在具有竞争条件。另一个线程可以在您调用 get 和调用 replace 之间递增计数器。 replace 然后将错误的值放入映射中,丢失了另一个线程执行的增量。

你需要让你的增量原子化。像这样的东西,它使用 replace 的版本,确保映射中的值在执行替换之前仍然相同:

str = str.trim();
while(true) {
    Integer oldValue = wordCountMap.putIfAbsent(str, 1);
    if(oldValue != null) {
        if(wordCountMap.replace(str, oldValue, oldValue + 1))
          break; // Successfully incremented the existing count
    } else {
        break; // Added new count of 1
    }
}