并发和并发数据结构
Concurrency and Concurrent Data Structures
我正在练习一点并发。
public class WordOccurrencesBigFile {
private String words;
private ConcurrentHashMap<String, Pair<String, Integer>> wordOccurrencesMap = new ConcurrentHashMap<>();
public WordOccurrencesBigFile(String wordsLine) {
this.words = wordsLine;
}
public void processWords() {
parseWordsLines();
printOrderAlphabetically();
printOrderByCount();
printByInsertionOrder();
}
private void parseWordsLines() {
String[] wordsLinesArray = words.split("\n");
ExecutorService executor = Executors.newFixedThreadPool(5);
for(String wordsLine: wordsLinesArray) {
executor.execute(() -> parseWords(wordsLine));
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
private void parseWords(String wordsLine) {
System.out.println(Thread.currentThread().getName() + " Start.");
System.out.println(Thread.currentThread().getName() + " Processing line: '" + wordsLine + "'");
String[] wordsArray = wordsLine.split(" ");
synchronized(this){
for (String word : wordsArray) {
Pair<String, Integer> pair = null;
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
}
wordOccurrencesMap.put(word, pair);
}
}
System.out.println(Thread.currentThread().getName() + " End.");
}
public static void main(String[] args) {
String wordsLines = "bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa";
WordOccurrencesBigFile wordOccurrences = new
WordOccurrencesBigFile(wordsLines);
wordOccurrences.processWords();
}
}
在 parseWordsLines 上创建了一个 ExecutorService,它有一个 5 线程池,WordOccurrencesBigFile class 是用带有由“\n”创建的多行的字符串实例化的。目的是让每一行都由不同的线程处理,并在地图上插入唯一单词的计数。
我原以为通过使用 ConcurrentHashMap 就足以处理我有多个线程读取和写入地图的事实。但是我在执行 class 的大多数时候得到不同的计数。 (奇怪主要是针对"bb"字。
但是添加同步(this)问题就解决了。
谁能解释一下为什么会出现这种行为,这是解决这个问题的最佳方法,我应该将 "this" 传递给同步块还是线程正在访问的对象?
非常感谢。
ConcurrentHashMap
是线程安全的,可以保证每一个操作都是线程安全的。
但是这些操作不是原子的:
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
}
wordOccurrencesMap.put(word, pair);
您可以改为使用单个操作:
wordOccurrencesMap.compute(word,
(s, pair) -> pair == null ?
new Pair<>(word, 1) : pair.setValue(pair.getValue() + 1));
正如@Thomas 在评论中提到的,你的计数器增量不是原子的,这意味着,
if (!wordOccurrencesMap.containsKey(word)) {
------>// two threads can enter in this block and results will be different.
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
}
查看 this post 了解更多信息,因为它详细解释了同一问题。
好吧,添加 synchronized(this)
可以解决问题,但是您将失去多线程和并行化带来的所有好处。
你需要的是ConcurrentMap
的computeIfAbsent
方法。所以你的 for
循环的主体将转换为
Pair<String, Integer> pair = wordOccurrencesMap.computeIfAbsent(word, w -> new Pair<>(w, 0));
synchronized(pair) {
pair.setValue(pair.getValue()+1);
}
现在您可以省略 synchronized(this)
块。
编辑:但是您必须确保当第一个线程调用 pair.setValue() 时,另一个线程不能调用 pair.getValue(),如评论所述。
我会尝试将我的两分钱添加到问题的 "best way to approach this problem" 部分:
- 您有一个 printByInsertionOrder() 方法。如果您在不保留某种位置数据的情况下并行处理文本,您将无法维持此顺序。
- 如果您得到的是人工编写的文本,您将遇到使用
wordsLine.split(" ")
拆分模式的一大堆麻烦。我建议改用 "[^\p{L}0-9]+"
,它匹配 "at least one character not beeing a unicode letter or a digit"。 \p{L}
特别强大,因为它还可以匹配 é、è 或 Ö 等变音符号。
我的建议是尽可能避免显式并发,并专注于函数式习语,例如:
public class SplitTest {
public static void main(String[] args) {
String text = "On the parseWordsLines an ExecutorService is created with a pool of 5 \n threads, and the WordOccurrencesBigFile class is instantiated with String with multiple lines created by \"\n\". The purpose is to have each line being processed by a different threads and insert on the Map the count of unique words.";
List<Entry<String, Point>> allWordsWithPositions = splitLines(text).entrySet().stream()
.parallel()
.flatMap(SplitTest::splitWordToStream)
.collect(Collectors.toList());
allWordsWithPositions.forEach(w -> System.out.println(w.getKey() + ":" + w.getValue()));
}
private static Map<String, Integer> splitLines(String text) {
String[] split = text.split("\n");
HashMap<String, Integer> lineMap = new HashMap<>();
for (int i = 0; i < split.length; ++i) {
lineMap.put(split[i], i);
}
return lineMap;
}
private static Stream<Entry<String, Point>> splitWordToStream(Entry<String, Integer> lineEntry) {
return splitWords(lineEntry.getValue(), lineEntry.getKey()).entrySet().stream();
}
private static Map<String, Point> splitWords(Integer lineNumber, String line) {
String[] split = line.split("[^\p{L}0-9]+");
HashMap<String, Point> wordMap = new HashMap<>();
for (int i = 0; i < split.length; ++i) {
wordMap.put(split[i], new Point(i, lineNumber));
}
return wordMap;
}
}
现在您有一个列表,其中包含正确拆分的所有单词以及它们在文本中的位置。并没有头痛地并行完成。您只需要 post- 处理结果以获得您需要的所有表示:)
我正在练习一点并发。
public class WordOccurrencesBigFile {
private String words;
private ConcurrentHashMap<String, Pair<String, Integer>> wordOccurrencesMap = new ConcurrentHashMap<>();
public WordOccurrencesBigFile(String wordsLine) {
this.words = wordsLine;
}
public void processWords() {
parseWordsLines();
printOrderAlphabetically();
printOrderByCount();
printByInsertionOrder();
}
private void parseWordsLines() {
String[] wordsLinesArray = words.split("\n");
ExecutorService executor = Executors.newFixedThreadPool(5);
for(String wordsLine: wordsLinesArray) {
executor.execute(() -> parseWords(wordsLine));
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
private void parseWords(String wordsLine) {
System.out.println(Thread.currentThread().getName() + " Start.");
System.out.println(Thread.currentThread().getName() + " Processing line: '" + wordsLine + "'");
String[] wordsArray = wordsLine.split(" ");
synchronized(this){
for (String word : wordsArray) {
Pair<String, Integer> pair = null;
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
}
wordOccurrencesMap.put(word, pair);
}
}
System.out.println(Thread.currentThread().getName() + " End.");
}
public static void main(String[] args) {
String wordsLines = "bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa";
WordOccurrencesBigFile wordOccurrences = new
WordOccurrencesBigFile(wordsLines);
wordOccurrences.processWords();
}
}
在 parseWordsLines 上创建了一个 ExecutorService,它有一个 5 线程池,WordOccurrencesBigFile class 是用带有由“\n”创建的多行的字符串实例化的。目的是让每一行都由不同的线程处理,并在地图上插入唯一单词的计数。
我原以为通过使用 ConcurrentHashMap 就足以处理我有多个线程读取和写入地图的事实。但是我在执行 class 的大多数时候得到不同的计数。 (奇怪主要是针对"bb"字。
但是添加同步(this)问题就解决了。
谁能解释一下为什么会出现这种行为,这是解决这个问题的最佳方法,我应该将 "this" 传递给同步块还是线程正在访问的对象?
非常感谢。
ConcurrentHashMap
是线程安全的,可以保证每一个操作都是线程安全的。
但是这些操作不是原子的:
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
}
wordOccurrencesMap.put(word, pair);
您可以改为使用单个操作:
wordOccurrencesMap.compute(word,
(s, pair) -> pair == null ?
new Pair<>(word, 1) : pair.setValue(pair.getValue() + 1));
正如@Thomas 在评论中提到的,你的计数器增量不是原子的,这意味着,
if (!wordOccurrencesMap.containsKey(word)) {
------>// two threads can enter in this block and results will be different.
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
}
查看 this post 了解更多信息,因为它详细解释了同一问题。
好吧,添加 synchronized(this)
可以解决问题,但是您将失去多线程和并行化带来的所有好处。
你需要的是ConcurrentMap
的computeIfAbsent
方法。所以你的 for
循环的主体将转换为
Pair<String, Integer> pair = wordOccurrencesMap.computeIfAbsent(word, w -> new Pair<>(w, 0));
synchronized(pair) {
pair.setValue(pair.getValue()+1);
}
现在您可以省略 synchronized(this)
块。
编辑:但是您必须确保当第一个线程调用 pair.setValue() 时,另一个线程不能调用 pair.getValue(),如评论所述。
我会尝试将我的两分钱添加到问题的 "best way to approach this problem" 部分:
- 您有一个 printByInsertionOrder() 方法。如果您在不保留某种位置数据的情况下并行处理文本,您将无法维持此顺序。
- 如果您得到的是人工编写的文本,您将遇到使用
wordsLine.split(" ")
拆分模式的一大堆麻烦。我建议改用"[^\p{L}0-9]+"
,它匹配 "at least one character not beeing a unicode letter or a digit"。\p{L}
特别强大,因为它还可以匹配 é、è 或 Ö 等变音符号。
我的建议是尽可能避免显式并发,并专注于函数式习语,例如:
public class SplitTest {
public static void main(String[] args) {
String text = "On the parseWordsLines an ExecutorService is created with a pool of 5 \n threads, and the WordOccurrencesBigFile class is instantiated with String with multiple lines created by \"\n\". The purpose is to have each line being processed by a different threads and insert on the Map the count of unique words.";
List<Entry<String, Point>> allWordsWithPositions = splitLines(text).entrySet().stream()
.parallel()
.flatMap(SplitTest::splitWordToStream)
.collect(Collectors.toList());
allWordsWithPositions.forEach(w -> System.out.println(w.getKey() + ":" + w.getValue()));
}
private static Map<String, Integer> splitLines(String text) {
String[] split = text.split("\n");
HashMap<String, Integer> lineMap = new HashMap<>();
for (int i = 0; i < split.length; ++i) {
lineMap.put(split[i], i);
}
return lineMap;
}
private static Stream<Entry<String, Point>> splitWordToStream(Entry<String, Integer> lineEntry) {
return splitWords(lineEntry.getValue(), lineEntry.getKey()).entrySet().stream();
}
private static Map<String, Point> splitWords(Integer lineNumber, String line) {
String[] split = line.split("[^\p{L}0-9]+");
HashMap<String, Point> wordMap = new HashMap<>();
for (int i = 0; i < split.length; ++i) {
wordMap.put(split[i], new Point(i, lineNumber));
}
return wordMap;
}
}
现在您有一个列表,其中包含正确拆分的所有单词以及它们在文本中的位置。并没有头痛地并行完成。您只需要 post- 处理结果以获得您需要的所有表示:)