Concurrency and Concurrent Data Structures
public class WordOccurrencesBigFile {
private String words;
private ConcurrentHashMap<String, Pair<String, Integer>> wordOccurrencesMap = new ConcurrentHashMap<>();
public WordOccurrencesBigFile(String wordsLine) {
this.words = wordsLine;
public void processWords() {
private void parseWordsLines() {
String[] wordsLinesArray = words.split("\n");
ExecutorService executor = Executors.newFixedThreadPool(5);
for(String wordsLine: wordsLinesArray) {
executor.execute(() -> parseWords(wordsLine));
while (!executor.isTerminated()) {
System.out.println("Finished all threads");
private void parseWords(String wordsLine) {
System.out.println(Thread.currentThread().getName() + " Start.");
System.out.println(Thread.currentThread().getName() + " Processing line: '" + wordsLine + "'");
String[] wordsArray = wordsLine.split(" ");
for (String word : wordsArray) {
Pair<String, Integer> pair = null;
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
wordOccurrencesMap.put(word, pair);
System.out.println(Thread.currentThread().getName() + " End.");
public static void main(String[] args) {
String wordsLines = "bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa";
WordOccurrencesBigFile wordOccurrences = new
在 parseWordsLines 上创建了一个 ExecutorService,它有一个 5 线程池,WordOccurrencesBigFile class 是用带有由“\n”创建的多行的字符串实例化的。目的是让每一行都由不同的线程处理,并在地图上插入唯一单词的计数。
我原以为通过使用 ConcurrentHashMap 就足以处理我有多个线程读取和写入地图的事实。但是我在执行 class 的大多数时候得到不同的计数。 (奇怪主要是针对"bb"字。
谁能解释一下为什么会出现这种行为,这是解决这个问题的最佳方法,我应该将 "this" 传递给同步块还是线程正在访问的对象?
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
wordOccurrencesMap.put(word, pair);
(s, pair) -> pair == null ?
new Pair<>(word, 1) : pair.setValue(pair.getValue() + 1));
正如@Thomas 在评论中提到的,你的计数器增量不是原子的,这意味着,
if (!wordOccurrencesMap.containsKey(word)) {
------>// two threads can enter in this block and results will be different.
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
查看 this post 了解更多信息,因为它详细解释了同一问题。
好吧,添加 synchronized(this)
方法。所以你的 for
Pair<String, Integer> pair = wordOccurrencesMap.computeIfAbsent(word, w -> new Pair<>(w, 0));
synchronized(pair) {
现在您可以省略 synchronized(this)
编辑:但是您必须确保当第一个线程调用 pair.setValue() 时,另一个线程不能调用 pair.getValue(),如评论所述。
我会尝试将我的两分钱添加到问题的 "best way to approach this problem" 部分:
- 您有一个 printByInsertionOrder() 方法。如果您在不保留某种位置数据的情况下并行处理文本,您将无法维持此顺序。
- 如果您得到的是人工编写的文本,您将遇到使用
wordsLine.split(" ")
拆分模式的一大堆麻烦。我建议改用 "[^\p{L}0-9]+"
,它匹配 "at least one character not beeing a unicode letter or a digit"。 \p{L}
特别强大,因为它还可以匹配 é、è 或 Ö 等变音符号。
public class SplitTest {
public static void main(String[] args) {
String text = "On the parseWordsLines an ExecutorService is created with a pool of 5 \n threads, and the WordOccurrencesBigFile class is instantiated with String with multiple lines created by \"\n\". The purpose is to have each line being processed by a different threads and insert on the Map the count of unique words.";
List<Entry<String, Point>> allWordsWithPositions = splitLines(text).entrySet().stream()
allWordsWithPositions.forEach(w -> System.out.println(w.getKey() + ":" + w.getValue()));
private static Map<String, Integer> splitLines(String text) {
String[] split = text.split("\n");
HashMap<String, Integer> lineMap = new HashMap<>();
for (int i = 0; i < split.length; ++i) {
lineMap.put(split[i], i);
return lineMap;
private static Stream<Entry<String, Point>> splitWordToStream(Entry<String, Integer> lineEntry) {
return splitWords(lineEntry.getValue(), lineEntry.getKey()).entrySet().stream();
private static Map<String, Point> splitWords(Integer lineNumber, String line) {
String[] split = line.split("[^\p{L}0-9]+");
HashMap<String, Point> wordMap = new HashMap<>();
for (int i = 0; i < split.length; ++i) {
wordMap.put(split[i], new Point(i, lineNumber));
return wordMap;
现在您有一个列表,其中包含正确拆分的所有单词以及它们在文本中的位置。并没有头痛地并行完成。您只需要 post- 处理结果以获得您需要的所有表示:)
public class WordOccurrencesBigFile {
private String words;
private ConcurrentHashMap<String, Pair<String, Integer>> wordOccurrencesMap = new ConcurrentHashMap<>();
public WordOccurrencesBigFile(String wordsLine) {
this.words = wordsLine;
public void processWords() {
private void parseWordsLines() {
String[] wordsLinesArray = words.split("\n");
ExecutorService executor = Executors.newFixedThreadPool(5);
for(String wordsLine: wordsLinesArray) {
executor.execute(() -> parseWords(wordsLine));
while (!executor.isTerminated()) {
System.out.println("Finished all threads");
private void parseWords(String wordsLine) {
System.out.println(Thread.currentThread().getName() + " Start.");
System.out.println(Thread.currentThread().getName() + " Processing line: '" + wordsLine + "'");
String[] wordsArray = wordsLine.split(" ");
for (String word : wordsArray) {
Pair<String, Integer> pair = null;
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
wordOccurrencesMap.put(word, pair);
System.out.println(Thread.currentThread().getName() + " End.");
public static void main(String[] args) {
String wordsLines = "bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa";
WordOccurrencesBigFile wordOccurrences = new
在 parseWordsLines 上创建了一个 ExecutorService,它有一个 5 线程池,WordOccurrencesBigFile class 是用带有由“\n”创建的多行的字符串实例化的。目的是让每一行都由不同的线程处理,并在地图上插入唯一单词的计数。
我原以为通过使用 ConcurrentHashMap 就足以处理我有多个线程读取和写入地图的事实。但是我在执行 class 的大多数时候得到不同的计数。 (奇怪主要是针对"bb"字。
谁能解释一下为什么会出现这种行为,这是解决这个问题的最佳方法,我应该将 "this" 传递给同步块还是线程正在访问的对象?
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
wordOccurrencesMap.put(word, pair);
(s, pair) -> pair == null ?
new Pair<>(word, 1) : pair.setValue(pair.getValue() + 1));
正如@Thomas 在评论中提到的,你的计数器增量不是原子的,这意味着,
if (!wordOccurrencesMap.containsKey(word)) {
------>// two threads can enter in this block and results will be different.
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
查看 this post 了解更多信息,因为它详细解释了同一问题。
好吧,添加 synchronized(this)
方法。所以你的 for
Pair<String, Integer> pair = wordOccurrencesMap.computeIfAbsent(word, w -> new Pair<>(w, 0));
synchronized(pair) {
现在您可以省略 synchronized(this)
编辑:但是您必须确保当第一个线程调用 pair.setValue() 时,另一个线程不能调用 pair.getValue(),如评论所述。
我会尝试将我的两分钱添加到问题的 "best way to approach this problem" 部分:
- 您有一个 printByInsertionOrder() 方法。如果您在不保留某种位置数据的情况下并行处理文本,您将无法维持此顺序。
- 如果您得到的是人工编写的文本,您将遇到使用
wordsLine.split(" ")
,它匹配 "at least one character not beeing a unicode letter or a digit"。\p{L}
特别强大,因为它还可以匹配 é、è 或 Ö 等变音符号。
public class SplitTest {
public static void main(String[] args) {
String text = "On the parseWordsLines an ExecutorService is created with a pool of 5 \n threads, and the WordOccurrencesBigFile class is instantiated with String with multiple lines created by \"\n\". The purpose is to have each line being processed by a different threads and insert on the Map the count of unique words.";
List<Entry<String, Point>> allWordsWithPositions = splitLines(text).entrySet().stream()
allWordsWithPositions.forEach(w -> System.out.println(w.getKey() + ":" + w.getValue()));
private static Map<String, Integer> splitLines(String text) {
String[] split = text.split("\n");
HashMap<String, Integer> lineMap = new HashMap<>();
for (int i = 0; i < split.length; ++i) {
lineMap.put(split[i], i);
return lineMap;
private static Stream<Entry<String, Point>> splitWordToStream(Entry<String, Integer> lineEntry) {
return splitWords(lineEntry.getValue(), lineEntry.getKey()).entrySet().stream();
private static Map<String, Point> splitWords(Integer lineNumber, String line) {
String[] split = line.split("[^\p{L}0-9]+");
HashMap<String, Point> wordMap = new HashMap<>();
for (int i = 0; i < split.length; ++i) {
wordMap.put(split[i], new Point(i, lineNumber));
return wordMap;
现在您有一个列表,其中包含正确拆分的所有单词以及它们在文本中的位置。并没有头痛地并行完成。您只需要 post- 处理结果以获得您需要的所有表示:)