多线程(ExecutorService)方法意外工作
Multithreading (ExecutorService) method works unexpectedly
我不明白哪里错了。
我正在尝试制作一个多线程程序,它将在数组中找到那些具有相同符号的字符串。然后应该打印这些字符串及其索引。
Example input: {"qwe","wqe","qwee","a","a"};
Output:
a = 3, 4
eqw = 0, 1
但是当我尝试 运行 程序时,它们的索引发生了一些变化。
我尝试以这种方式同步我的方法:使用(this)同步,在方法级别同步,
使用为每个对象创建的锁进行同步。也许我遗漏了一些东西,但有些东西可以让它发挥作用?
public class Main {
private volatile Map<String, Integer> countByString = new ConcurrentHashMap<>();
private volatile Map<String, String> indexesByString = new ConcurrentHashMap<>();
public static void main(String[] args) {
String[] arr = {"qwe", "wqe", "qwee", "a", "a"};
Main main = new Main();
List<Callable<Void>> tasks = new ArrayList<>();
AtomicInteger i = new AtomicInteger(0);
Arrays.stream(arr).forEach(str -> {
tasks.add(() -> {
char[] charArr = str.toCharArray();
Arrays.sort(charArr);
String sortedStr = new String(charArr);
main.calculateCount(sortedStr);
main.calculateIndex(sortedStr, i.getAndIncrement());
return null;
});
});
try {
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.invokeAll(tasks);
executorService.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
main.printResult();
}
void calculateCount(String str) {
synchronized (countByString) {
int count = countByString.get(str) == null ? 0 : countByString.get(str);
countByString.put(str, ++count);
}
}
void calculateIndex(String str, int index) {
synchronized (indexesByString) {
System.out.println(Thread.currentThread().getName() + " " + str + " " + index);
String indexes = indexesByString.get(str);
if (indexes == null) {
indexes = "";
}
indexes += (index + ";");
indexesByString.put(str, indexes);
}
}
private void printResult() {
for (Map.Entry<String, Integer> entry : countByString.entrySet()) {
String str = entry.getKey();
// Integer count = entry.getValue();
// if (count >= 2) {
String indexes = indexesByString.get(str);
System.out.println(str + " = " + indexes);
// }
}
}
}
你的代码的问题是这部分:
Arrays.stream(arr).forEach(str -> {
tasks.add(() -> {
char[] charArr = str.toCharArray();
Arrays.sort(charArr);
String sortedStr = new String(charArr);
main.calculateCount(sortedStr);
main.calculateIndex(sortedStr, i.getAndIncrement());
return null;
});
});
您正在将相同的工作(在整个字符串数组中搜索)分配给所有线程。因此,线程正在覆盖彼此的索引。相反,您应该做的是在线程之间分配搜索字符串的工作。类似于:
tasks.add(() -> {
for(int threadIndex = i.getAndIncrement(); threadIndex < arr.length; threadIndex = i.getAndIncrement()){
char[] charArr = arr[threadIndex].toCharArray();
Arrays.sort(charArr);
String sortedStr = new String(charArr);
main.calculateCount(sortedStr);
main.calculateIndex(sortedStr, threadIndex);
}
return null;
});
我使用 AtomicInteger
的线程安全属性来模拟线程间循环迭代的动态工作分配。
您提供的代码还有其他问题,主要与性能和同步开销有关。
private volatile Map<String, Integer> countByString = new ConcurrentHashMap<>();
private volatile Map<String, String> indexesByString = new ConcurrentHashMap<>();
在这种情况下,您既不需要 volatile
也不需要 ConcurrentHashMap
,但是,您可以将这些字段设置为 final
。 volatile
子句可以删除,因为您在并行区域之前创建了对象 main
,并且 ConcurrentHashMap
可以更改为 HashMap
,因为您已经同步了这些字段无论如何。
在关注代码的正确性之后,您应该尽量减少同步开销。例如,您可以尝试在每个线程中复制 countByString
和 indexesByString
,并在并行工作完成后按顺序减少它们的值。
自然地,考虑到您当前输入的大小,很难注意到性能优化之间有意义的差异。
我不明白哪里错了。 我正在尝试制作一个多线程程序,它将在数组中找到那些具有相同符号的字符串。然后应该打印这些字符串及其索引。
Example input: {"qwe","wqe","qwee","a","a"};
Output:
a = 3, 4
eqw = 0, 1
但是当我尝试 运行 程序时,它们的索引发生了一些变化。 我尝试以这种方式同步我的方法:使用(this)同步,在方法级别同步, 使用为每个对象创建的锁进行同步。也许我遗漏了一些东西,但有些东西可以让它发挥作用?
public class Main {
private volatile Map<String, Integer> countByString = new ConcurrentHashMap<>();
private volatile Map<String, String> indexesByString = new ConcurrentHashMap<>();
public static void main(String[] args) {
String[] arr = {"qwe", "wqe", "qwee", "a", "a"};
Main main = new Main();
List<Callable<Void>> tasks = new ArrayList<>();
AtomicInteger i = new AtomicInteger(0);
Arrays.stream(arr).forEach(str -> {
tasks.add(() -> {
char[] charArr = str.toCharArray();
Arrays.sort(charArr);
String sortedStr = new String(charArr);
main.calculateCount(sortedStr);
main.calculateIndex(sortedStr, i.getAndIncrement());
return null;
});
});
try {
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.invokeAll(tasks);
executorService.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
main.printResult();
}
void calculateCount(String str) {
synchronized (countByString) {
int count = countByString.get(str) == null ? 0 : countByString.get(str);
countByString.put(str, ++count);
}
}
void calculateIndex(String str, int index) {
synchronized (indexesByString) {
System.out.println(Thread.currentThread().getName() + " " + str + " " + index);
String indexes = indexesByString.get(str);
if (indexes == null) {
indexes = "";
}
indexes += (index + ";");
indexesByString.put(str, indexes);
}
}
private void printResult() {
for (Map.Entry<String, Integer> entry : countByString.entrySet()) {
String str = entry.getKey();
// Integer count = entry.getValue();
// if (count >= 2) {
String indexes = indexesByString.get(str);
System.out.println(str + " = " + indexes);
// }
}
}
}
你的代码的问题是这部分:
Arrays.stream(arr).forEach(str -> {
tasks.add(() -> {
char[] charArr = str.toCharArray();
Arrays.sort(charArr);
String sortedStr = new String(charArr);
main.calculateCount(sortedStr);
main.calculateIndex(sortedStr, i.getAndIncrement());
return null;
});
});
您正在将相同的工作(在整个字符串数组中搜索)分配给所有线程。因此,线程正在覆盖彼此的索引。相反,您应该做的是在线程之间分配搜索字符串的工作。类似于:
tasks.add(() -> {
for(int threadIndex = i.getAndIncrement(); threadIndex < arr.length; threadIndex = i.getAndIncrement()){
char[] charArr = arr[threadIndex].toCharArray();
Arrays.sort(charArr);
String sortedStr = new String(charArr);
main.calculateCount(sortedStr);
main.calculateIndex(sortedStr, threadIndex);
}
return null;
});
我使用 AtomicInteger
的线程安全属性来模拟线程间循环迭代的动态工作分配。
您提供的代码还有其他问题,主要与性能和同步开销有关。
private volatile Map<String, Integer> countByString = new ConcurrentHashMap<>();
private volatile Map<String, String> indexesByString = new ConcurrentHashMap<>();
在这种情况下,您既不需要 volatile
也不需要 ConcurrentHashMap
,但是,您可以将这些字段设置为 final
。 volatile
子句可以删除,因为您在并行区域之前创建了对象 main
,并且 ConcurrentHashMap
可以更改为 HashMap
,因为您已经同步了这些字段无论如何。
在关注代码的正确性之后,您应该尽量减少同步开销。例如,您可以尝试在每个线程中复制 countByString
和 indexesByString
,并在并行工作完成后按顺序减少它们的值。
自然地,考虑到您当前输入的大小,很难注意到性能优化之间有意义的差异。