多线程:识别重复对象
Multithreaded: Identifying duplicate objects
我正在尝试对 List 对象实施重复对象查找方法。目标是通过多线程遍历List,找出重复的对象。到目前为止,我使用 ExecutorService
如下。
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < jobs; i++) {
Runnable worker = new TaskToDo(jobs);
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
在 TaskToDo class 我遍历循环。当检测到重复项时,将从列表中删除其中的一个。以下是我遇到的问题,
- 在执行程序中使用多个线程时,结果不是预期的。列表中仍然存在一些重复值。但是执行者的单个线程工作得很好。我试过了
List<String> list = Collections.synchronizedList(new LinkedList<String>())
也是,但存在同样的问题。
- 我可以使用哪种最佳数据结构来删除重复项以获得更好的性能?
Google 给出了一些使用并发结构的结果。但很难找出实现这一目标的正确方法。
感谢你的帮助。提前致谢...:)
以下是遍历指定列表对象的代码。这里将比较文件的实际内容。
for(int i = currentTemp; i < list.size() - 1; i++){
if(isEqual(list.get(currentTemp), list.get(i+1))){
synchronized (list) {
list.remove(i + 1);
i--;
}}}
根据您当前的逻辑,您必须以更粗粒度的方式进行同步,否则您可能会删除错误的元素。
for (int i = currentTemp; i < list.size() - 1; i++) {
synchronized (list) {
if (i + 1 > list.size() && isEqual(list.get(currentTemp), list.get(i+1))) {
list.remove(i + 1);
i--;
}
}
}
你看,isEqual()
检查必须在同步块内,以确保等价检查与元素删除的原子性。假设您的大部分并发处理收益将来自使用 isEqual()
对列表元素的异步比较,此更改会使您寻求的任何收益无效。
此外,在同步块外检查 list.size()
也不够好,因为列表元素可能会被其他线程删除。除非您有办法在其他线程删除元素时向下调整列表索引,否则您的代码将不知不觉地跳过检查列表中的某些元素。其他线程正在将元素从当前线程的 for
循环中移出。
使用附加列表来跟踪应删除的索引会更好地执行此任务:
private volatile Set<Integer> indexesToRemove =
Collections.synchronizedSet(new TreeSet<Integer>(
new Comparator<Integer>() {
@Override public int compare(Integer i1, Integer i2) {
return i2.compareTo(i1); // sort descending for later element removal
}
}
));
以上内容应在与您的 list
相同的共享级别声明。那么遍历列表的代码应该是这样的,不需要同步:
int size = list.size();
for (int i = currentTemp; i < size - 1; i++) {
if (!indexesToRemove.contains(i + 1)) {
if (isEqual(list.get(currentTemp), list.get(i+1))) {
indexesToRemove.add(i + 1);
}
}
}
最后,在您 join()
将工作线程恢复为单个线程后,执行此操作以删除重复列表:
for (Integer i: indexesToRemove) {
list.remove(i.intValue());
}
因为我们为 indexesToRemove 使用了降序排序的 TreeSet,所以我们可以简单地迭代它的索引并从列表中删除每个索引。
如果您的算法作用于足够多的数据,您可能会真正从多线程中获益,那么您会遇到另一个问题,该问题往往会削弱任何性能优势。每个线程都必须扫描整个列表以查看它正在处理的元素是否重复,这将导致 CPU 缓存不断丢失,因为各种线程竞争访问列表的不同部分。
这称为 False Sharing。
即使 False Sharing 没有得到你,你也在 O(N^2) 中对列表进行重复数据删除,因为对于列表的每个元素,你重新迭代整个列表。
相反,请考虑使用 Set 来初始收集数据。如果不能这样做,请测试将列表元素添加到 Set 的性能。这应该是解决这个问题的一种非常有效的方法。
如果您要对大量文件进行重复数据删除,您确实应该使用基于散列的结构。同时修改列表是危险的,尤其是因为列表中的索引会不断从您的下方更改,这很糟糕。
如果你可以使用 Java 8,我的方法看起来像这样。假设您有一个 List<String> fileList
.
Collection<String> deduplicatedFiles = fileList.parallelStream()
.map(FileSystems.getDefault()::getPath) // convert strings to Paths
.collect(Collectors.toConcurrentMap(
path -> {
try {
return ByteBuffer.wrap(Files.readAllBytes(path)),
// read out the file contents and wrap in a ByteBuffer
// which is a suitable key for a hash map
} catch (IOException e) {
throw new RuntimeException(e);
}
},
path -> path.toString(), // in the values, convert back to string
(first, second) -> first) // resolve duplicates by choosing arbitrarily
.values();
这就是 整个 的事情:它同时读取所有文件,对它们进行哈希处理(尽管使用未指定的哈希算法可能不是 很好), 对它们进行重复数据删除,并吐出具有不同内容的文件列表。
如果您使用的是 Java 7,那么我会这样做。
CompletionService<Void> service = new ExecutorCompletionService<>(
Executors.newFixedThreadPool(4));
final ConcurrentMap<ByteBuffer, String> unique = new ConcurrentHashMap<>();
for (final String file : fileList) {
service.submit(new Runnable() {
@Override public void run() {
try {
ByteBuffer buffer = ByteBuffer.wrap(Files.readAllBytes(
FileSystem.getDefault().getPath(file)));
unique.putIfAbsent(buffer, file);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, null);
}
for (int i = 0; i < fileList.size(); i++) {
service.take();
}
Collection<String> result = unique.values();
我正在尝试对 List 对象实施重复对象查找方法。目标是通过多线程遍历List,找出重复的对象。到目前为止,我使用 ExecutorService
如下。
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < jobs; i++) {
Runnable worker = new TaskToDo(jobs);
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
在 TaskToDo class 我遍历循环。当检测到重复项时,将从列表中删除其中的一个。以下是我遇到的问题,
- 在执行程序中使用多个线程时,结果不是预期的。列表中仍然存在一些重复值。但是执行者的单个线程工作得很好。我试过了
List<String> list = Collections.synchronizedList(new LinkedList<String>())
也是,但存在同样的问题。 - 我可以使用哪种最佳数据结构来删除重复项以获得更好的性能?
Google 给出了一些使用并发结构的结果。但很难找出实现这一目标的正确方法。 感谢你的帮助。提前致谢...:)
以下是遍历指定列表对象的代码。这里将比较文件的实际内容。
for(int i = currentTemp; i < list.size() - 1; i++){
if(isEqual(list.get(currentTemp), list.get(i+1))){
synchronized (list) {
list.remove(i + 1);
i--;
}}}
根据您当前的逻辑,您必须以更粗粒度的方式进行同步,否则您可能会删除错误的元素。
for (int i = currentTemp; i < list.size() - 1; i++) {
synchronized (list) {
if (i + 1 > list.size() && isEqual(list.get(currentTemp), list.get(i+1))) {
list.remove(i + 1);
i--;
}
}
}
你看,isEqual()
检查必须在同步块内,以确保等价检查与元素删除的原子性。假设您的大部分并发处理收益将来自使用 isEqual()
对列表元素的异步比较,此更改会使您寻求的任何收益无效。
此外,在同步块外检查 list.size()
也不够好,因为列表元素可能会被其他线程删除。除非您有办法在其他线程删除元素时向下调整列表索引,否则您的代码将不知不觉地跳过检查列表中的某些元素。其他线程正在将元素从当前线程的 for
循环中移出。
使用附加列表来跟踪应删除的索引会更好地执行此任务:
private volatile Set<Integer> indexesToRemove =
Collections.synchronizedSet(new TreeSet<Integer>(
new Comparator<Integer>() {
@Override public int compare(Integer i1, Integer i2) {
return i2.compareTo(i1); // sort descending for later element removal
}
}
));
以上内容应在与您的 list
相同的共享级别声明。那么遍历列表的代码应该是这样的,不需要同步:
int size = list.size();
for (int i = currentTemp; i < size - 1; i++) {
if (!indexesToRemove.contains(i + 1)) {
if (isEqual(list.get(currentTemp), list.get(i+1))) {
indexesToRemove.add(i + 1);
}
}
}
最后,在您 join()
将工作线程恢复为单个线程后,执行此操作以删除重复列表:
for (Integer i: indexesToRemove) {
list.remove(i.intValue());
}
因为我们为 indexesToRemove 使用了降序排序的 TreeSet,所以我们可以简单地迭代它的索引并从列表中删除每个索引。
如果您的算法作用于足够多的数据,您可能会真正从多线程中获益,那么您会遇到另一个问题,该问题往往会削弱任何性能优势。每个线程都必须扫描整个列表以查看它正在处理的元素是否重复,这将导致 CPU 缓存不断丢失,因为各种线程竞争访问列表的不同部分。
这称为 False Sharing。
即使 False Sharing 没有得到你,你也在 O(N^2) 中对列表进行重复数据删除,因为对于列表的每个元素,你重新迭代整个列表。
相反,请考虑使用 Set 来初始收集数据。如果不能这样做,请测试将列表元素添加到 Set 的性能。这应该是解决这个问题的一种非常有效的方法。
如果您要对大量文件进行重复数据删除,您确实应该使用基于散列的结构。同时修改列表是危险的,尤其是因为列表中的索引会不断从您的下方更改,这很糟糕。
如果你可以使用 Java 8,我的方法看起来像这样。假设您有一个 List<String> fileList
.
Collection<String> deduplicatedFiles = fileList.parallelStream()
.map(FileSystems.getDefault()::getPath) // convert strings to Paths
.collect(Collectors.toConcurrentMap(
path -> {
try {
return ByteBuffer.wrap(Files.readAllBytes(path)),
// read out the file contents and wrap in a ByteBuffer
// which is a suitable key for a hash map
} catch (IOException e) {
throw new RuntimeException(e);
}
},
path -> path.toString(), // in the values, convert back to string
(first, second) -> first) // resolve duplicates by choosing arbitrarily
.values();
这就是 整个 的事情:它同时读取所有文件,对它们进行哈希处理(尽管使用未指定的哈希算法可能不是 很好), 对它们进行重复数据删除,并吐出具有不同内容的文件列表。
如果您使用的是 Java 7,那么我会这样做。
CompletionService<Void> service = new ExecutorCompletionService<>(
Executors.newFixedThreadPool(4));
final ConcurrentMap<ByteBuffer, String> unique = new ConcurrentHashMap<>();
for (final String file : fileList) {
service.submit(new Runnable() {
@Override public void run() {
try {
ByteBuffer buffer = ByteBuffer.wrap(Files.readAllBytes(
FileSystem.getDefault().getPath(file)));
unique.putIfAbsent(buffer, file);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, null);
}
for (int i = 0; i < fileList.size(); i++) {
service.take();
}
Collection<String> result = unique.values();