Callable class 中的并发修改异常
Concurrent Modification Exception in Callable class
我试图在较小的子列表中拆分对象列表,并在不同的线程上分别处理它们。所以我有以下代码:
List<Instance> instances = xmlInstance.readInstancesFromXml();
List<Future<List<Instance>>> futureList = new ArrayList<>();
int nThreads = 4;
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
final List<List<Instance>> instancesPerThread = split(instances, nThreads);
for (List<Instance> instancesThread : instancesPerThread) {
if (instancesThread.isEmpty()) {
break;
}
Callable<List<Instance>> callable = new MyCallable(instancesThread);
Future<List<Instance>> submit = executor.submit(callable);
futureList.add(submit);
}
instances.clear();
for (Future<List<Instance>> future : futureList) {
try {
final List<Instance> instancesFromFuture = future.get();
instances.addAll(instancesFromFuture);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
还有 MyCallable class :
public class MyCallable implements Callable<List<Instance>> {
private List<Instance> instances;
public MyCallable (List<Instance> instances) {
this.instances = Collections.synchronizedList(instances);
}
@Override
public List<Instance> call() throws Exception {
for (Instance instance : instances) {
//process each object and changing some fields;
}
return instances;
}
}
拆分方法(它将给定列表拆分为给定数量的列表,并尝试在每个子列表上具有几乎相同的大小):
public static List<List<Instance>> split(List<Instance> list, int nrOfThreads) {
List<List<Instance>> parts = new ArrayList<>();
final int nrOfItems = list.size();
int minItemsPerThread = nrOfItems / nrOfThreads;
int maxItemsPerThread = minItemsPerThread + 1;
int threadsWithMaxItems = nrOfItems - nrOfThreads * minItemsPerThread;
int start = 0;
for (int i = 0; i < nrOfThreads; i++) {
int itemsCount = (i < threadsWithMaxItems ? maxItemsPerThread : minItemsPerThread);
int end = start + itemsCount;
parts.add(list.subList(start, end));
start = end;
}
return parts;
}
所以,当我尝试执行它时,我在这一行 java.util.ConcurrentModificationException for (Instance instance : instances) {
上得到了 for (Instance instance : instances) {
,有人可以给出它发生的任何想法吗?
public MyCallable (List<Instance> instances) {
this.instances = Collections.synchronizedList(instances);
}
像这样使用 synchronizedList
并不能像您认为的那样帮助您。
只有在创建列表时将列表包装在 synchronizedList
中才有用(例如 Collections.synchronizedList(new ArrayList<>())
。否则,底层列表可以直接访问,因此可以以非同步方式访问。
此外,synchronizedList
仅在单个方法调用期间同步,而不是在您迭代它时的整个时间。
此处最简单的解决方法是在构造函数中复制列表:
this.instances = new ArrayList<>(instances);
然后,没有其他人可以访问该列表,因此他们无法在您迭代它时更改它。
这与在 call
方法中获取列表的副本不同,因为副本是在代码的 single-threaded 部分完成的:当您进行修改时,其他线程无法修改它正在获取该副本,因此您不会获得 ConcurrentModificationException
(您 可以 在 single-threaded 代码中获得 CME,但不使用此复制构造函数)。在 call
方法中进行复制意味着列表被迭代,其方式与您已有的 for
循环完全相同。
我试图在较小的子列表中拆分对象列表,并在不同的线程上分别处理它们。所以我有以下代码:
List<Instance> instances = xmlInstance.readInstancesFromXml();
List<Future<List<Instance>>> futureList = new ArrayList<>();
int nThreads = 4;
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
final List<List<Instance>> instancesPerThread = split(instances, nThreads);
for (List<Instance> instancesThread : instancesPerThread) {
if (instancesThread.isEmpty()) {
break;
}
Callable<List<Instance>> callable = new MyCallable(instancesThread);
Future<List<Instance>> submit = executor.submit(callable);
futureList.add(submit);
}
instances.clear();
for (Future<List<Instance>> future : futureList) {
try {
final List<Instance> instancesFromFuture = future.get();
instances.addAll(instancesFromFuture);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
还有 MyCallable class :
public class MyCallable implements Callable<List<Instance>> {
private List<Instance> instances;
public MyCallable (List<Instance> instances) {
this.instances = Collections.synchronizedList(instances);
}
@Override
public List<Instance> call() throws Exception {
for (Instance instance : instances) {
//process each object and changing some fields;
}
return instances;
}
}
拆分方法(它将给定列表拆分为给定数量的列表,并尝试在每个子列表上具有几乎相同的大小):
public static List<List<Instance>> split(List<Instance> list, int nrOfThreads) {
List<List<Instance>> parts = new ArrayList<>();
final int nrOfItems = list.size();
int minItemsPerThread = nrOfItems / nrOfThreads;
int maxItemsPerThread = minItemsPerThread + 1;
int threadsWithMaxItems = nrOfItems - nrOfThreads * minItemsPerThread;
int start = 0;
for (int i = 0; i < nrOfThreads; i++) {
int itemsCount = (i < threadsWithMaxItems ? maxItemsPerThread : minItemsPerThread);
int end = start + itemsCount;
parts.add(list.subList(start, end));
start = end;
}
return parts;
}
所以,当我尝试执行它时,我在这一行 java.util.ConcurrentModificationException for (Instance instance : instances) {
上得到了 for (Instance instance : instances) {
,有人可以给出它发生的任何想法吗?
public MyCallable (List<Instance> instances) {
this.instances = Collections.synchronizedList(instances);
}
像这样使用 synchronizedList
并不能像您认为的那样帮助您。
只有在创建列表时将列表包装在 synchronizedList
中才有用(例如 Collections.synchronizedList(new ArrayList<>())
。否则,底层列表可以直接访问,因此可以以非同步方式访问。
此外,synchronizedList
仅在单个方法调用期间同步,而不是在您迭代它时的整个时间。
此处最简单的解决方法是在构造函数中复制列表:
this.instances = new ArrayList<>(instances);
然后,没有其他人可以访问该列表,因此他们无法在您迭代它时更改它。
这与在 call
方法中获取列表的副本不同,因为副本是在代码的 single-threaded 部分完成的:当您进行修改时,其他线程无法修改它正在获取该副本,因此您不会获得 ConcurrentModificationException
(您 可以 在 single-threaded 代码中获得 CME,但不使用此复制构造函数)。在 call
方法中进行复制意味着列表被迭代,其方式与您已有的 for
循环完全相同。