并发和并发数据结构

Concurrency and Concurrent Data Structures

我正在练习一点并发。

public class WordOccurrencesBigFile {
    private String words;

    private ConcurrentHashMap<String, Pair<String, Integer>> wordOccurrencesMap = new ConcurrentHashMap<>();

    public WordOccurrencesBigFile(String wordsLine) {
        this.words = wordsLine;
    }

    public void processWords() {
        parseWordsLines();

        printOrderAlphabetically();
        printOrderByCount();
        printByInsertionOrder();
    }

    private void parseWordsLines() {
        String[] wordsLinesArray = words.split("\n");

        ExecutorService executor = Executors.newFixedThreadPool(5);
        for(String wordsLine: wordsLinesArray) {
            executor.execute(() -> parseWords(wordsLine));
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }

    private void parseWords(String wordsLine) {
        System.out.println(Thread.currentThread().getName() + " Start.");
        System.out.println(Thread.currentThread().getName() + " Processing line: '" + wordsLine + "'");
        String[] wordsArray = wordsLine.split(" ");

        synchronized(this){
            for (String word : wordsArray) {
                Pair<String, Integer> pair = null;
                if (!wordOccurrencesMap.containsKey(word)) {
                    pair = new Pair<>(word, 1);
                    //System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
                } else {
                    pair = wordOccurrencesMap.get(word);
                    pair.setValue(pair.getValue() + 1);
                    //System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
                }

                wordOccurrencesMap.put(word, pair);
            }
        }
        System.out.println(Thread.currentThread().getName() + " End.");
    }

    public static void main(String[] args) {
        String wordsLines = "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa\n"+
            "bb cc aa ccc bb cc cc aa";

        WordOccurrencesBigFile wordOccurrences = new 
        WordOccurrencesBigFile(wordsLines);
        wordOccurrences.processWords();
}

}

在 parseWordsLines 上创建了一个 ExecutorService,它有一个 5 线程池,WordOccurrencesBigFile class 是用带有由“\n”创建的多行的字符串实例化的。目的是让每一行都由不同的线程处理,并在地图上插入唯一单词的计数。

我原以为通过使用 ConcurrentHashMap 就足以处理我有多个线程读取和写入地图的事实。但是我在执行 class 的大多数时候得到不同的计数。 (奇怪主要是针对"bb"字。

但是添加同步(this)问题就解决了。

谁能解释一下为什么会出现这种行为,这是解决这个问题的最佳方法,我应该将 "this" 传递给同步块还是线程正在访问的对象?

非常感谢。

ConcurrentHashMap是线程安全的,可以保证每一个操作都是线程安全的。

但是这些操作不是原子的:

            if (!wordOccurrencesMap.containsKey(word)) {
                pair = new Pair<>(word, 1);
                //System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
            } else {
                pair = wordOccurrencesMap.get(word);
                pair.setValue(pair.getValue() + 1);
                //System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
            }

            wordOccurrencesMap.put(word, pair);

您可以改为使用单个操作:

wordOccurrencesMap.compute(word,
        (s, pair) -> pair == null ?
                new Pair<>(word, 1) : pair.setValue(pair.getValue() + 1));

正如@Thomas 在评论中提到的,你的计数器增量不是原子的,这意味着,

    if (!wordOccurrencesMap.containsKey(word)) {
------>// two threads can enter in this block and results will be different.

        pair = new Pair<>(word, 1);
        //System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
    }

查看 this post 了解更多信息,因为它详细解释了同一问题。

好吧,添加 synchronized(this) 可以解决问题,但是您将失去多线程和并行化带来的所有好处。

你需要的是ConcurrentMapcomputeIfAbsent方法。所以你的 for 循环的主体将转换为

Pair<String, Integer> pair = wordOccurrencesMap.computeIfAbsent(word, w -> new Pair<>(w, 0));
synchronized(pair) {
    pair.setValue(pair.getValue()+1);
}

现在您可以省略 synchronized(this) 块。

编辑:但是您必须确保当第一个线程调用 pair.setValue() 时,另一个线程不能调用 pair.getValue(),如评论所述。

我会尝试将我的两分钱添加到问题的 "best way to approach this problem" 部分:

  • 您有一个 printByInsertionOrder() 方法。如果您在不保留某种位置数据的情况下并行处理文本,您将无法维持此顺序。
  • 如果您得到的是人工编写的文本,您将遇到使用 wordsLine.split(" ") 拆分模式的一大堆麻烦。我建议改用 "[^\p{L}0-9]+",它匹配 "at least one character not beeing a unicode letter or a digit"。 \p{L} 特别强大,因为它还可以匹配 é、è 或 Ö 等变音符号。

我的建议是尽可能避免显式并发,并专注于函数式习语,例如:

public class SplitTest {

    public static void main(String[] args) {

        String text = "On the parseWordsLines an ExecutorService is created with a pool of 5 \n threads, and the WordOccurrencesBigFile class is instantiated with String with multiple lines created by \"\n\". The purpose is to have each line being processed by a different threads and insert on the Map the count of unique words.";
        List<Entry<String, Point>> allWordsWithPositions = splitLines(text).entrySet().stream()
                .parallel()
                .flatMap(SplitTest::splitWordToStream)
                .collect(Collectors.toList());
        allWordsWithPositions.forEach(w -> System.out.println(w.getKey() + ":" + w.getValue()));
    }

    private static Map<String, Integer> splitLines(String text) {
        String[] split = text.split("\n");
        HashMap<String, Integer> lineMap = new HashMap<>();
        for (int i = 0; i < split.length; ++i) {
            lineMap.put(split[i], i);
        }
        return lineMap;
    }

    private static Stream<Entry<String, Point>> splitWordToStream(Entry<String, Integer> lineEntry) {
        return splitWords(lineEntry.getValue(), lineEntry.getKey()).entrySet().stream();
    }

    private static Map<String, Point> splitWords(Integer lineNumber, String line) {
        String[] split = line.split("[^\p{L}0-9]+");
        HashMap<String, Point> wordMap = new HashMap<>();
        for (int i = 0; i < split.length; ++i) {
            wordMap.put(split[i], new Point(i, lineNumber));
        }
        return wordMap;
    }
}

现在您有一个列表,其中包含正确拆分的所有单词以及它们在文本中的位置。并没有头痛地并行完成。您只需要 post- 处理结果以获得您需要的所有表示:)