多线程:识别重复对象

Multithreaded: Identifying duplicate objects

我正在尝试对 List 对象实施重复对象查找方法。目标是通过多线程遍历List,找出重复的对象。到目前为止,我使用 ExecutorService 如下。

ExecutorService executor = Executors.newFixedThreadPool(5);
    for (int i = 0; i < jobs; i++) {
        Runnable worker = new TaskToDo(jobs);
        executor.execute(worker);
    }
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");

在 TaskToDo class 我遍历循环。当检测到重复项时,将从列表中删除其中的一个。以下是我遇到的问题,

  1. 在执行程序中使用多个线程时,结果不是预期的。列表中仍然存在一些重复值。但是执行者的单个线程工作得很好。我试过了 List<String> list = Collections.synchronizedList(new LinkedList<String>()) 也是,但存在同样的问题。
  2. 我可以使用哪种最佳数据结构来删除重复项以获得更好的性能?

Google 给出了一些使用并发结构的结果。但很难找出实现这一目标的正确方法。 感谢你的帮助。提前致谢...:)

以下是遍历指定列表对象的代码。这里将比较文件的实际内容。

for(int i = currentTemp; i < list.size() - 1; i++){
        if(isEqual(list.get(currentTemp), list.get(i+1))){
            synchronized (list) {
                list.remove(i + 1);
                i--;
}}}

根据您当前的逻辑,您必须以更粗粒度的方式进行同步,否则您可能会删除错误的元素。

for (int i = currentTemp; i < list.size() - 1; i++) {
  synchronized (list) {
    if (i + 1 > list.size() && isEqual(list.get(currentTemp), list.get(i+1))) {
      list.remove(i + 1);
      i--;
    }
  }
}

你看,isEqual() 检查必须在同步块内,以确保等价检查与元素删除的原子性。假设您的大部分并发处理收益将来自使用 isEqual() 对列表元素的异步比较,此更改会使您寻求的任何收益无效。

此外,在同步块外检查 list.size() 也不够好,因为列表元素可能会被其他线程删除。除非您有办法在其他线程删除元素时向下调整列表索引,否则您的代码将不知不觉地跳过检查列表中的某些元素。其他线程正在将元素从当前线程的 for 循环中移出。

使用附加列表来跟踪应删除的索引会更好地执行此任务:

private volatile Set<Integer> indexesToRemove =
  Collections.synchronizedSet(new TreeSet<Integer>(
    new Comparator<Integer>() {
      @Override public int compare(Integer i1, Integer i2) {
        return i2.compareTo(i1); // sort descending for later element removal
      }
    }
  ));

以上内容应在与您的 list 相同的共享级别声明。那么遍历列表的代码应该是这样的,不需要同步:

int size = list.size();
for (int i = currentTemp; i < size - 1; i++) {
  if (!indexesToRemove.contains(i + 1)) {
    if (isEqual(list.get(currentTemp), list.get(i+1))) {
      indexesToRemove.add(i + 1);
    }
  }
}

最后,在您 join() 将工作线程恢复为单个线程后,执行此操作以删除重复列表:

for (Integer i: indexesToRemove) {
  list.remove(i.intValue());
}

因为我们为 indexesToRemove 使用了降序排序的 TreeSet,所以我们可以简单地迭代它的索引并从列表中删除每个索引。

如果您的算法作用于足够多的数据,您可能会真正从多线程中获益,那么您会遇到另一个问题,该问题往往会削弱任何性能优势。每个线程都必须扫描整个列表以查看它正在处理的元素是否重复,这将导致 CPU 缓存不断丢失,因为各种线程竞争访问列表的不同部分。

这称为 False Sharing

即使 False Sharing 没有得到你,你也在 O(N^2) 中对列表进行重复数据删除,因为对于列表的每个元素,你重新迭代整个列表。

相反,请考虑使用 Set 来初始收集数据。如果不能这样做,请测试将列表元素添加到 Set 的性能。这应该是解决这个问题的一种非常有效的方法。

如果您要对大量文件进行重复数据删除,您确实应该使用基于散列的结构。同时修改列表是危险的,尤其是因为列表中的索引会不断从您的下方更改,这很糟糕。

如果你可以使用 Java 8,我的方法看起来像这样。假设您有一个 List<String> fileList.

 Collection<String> deduplicatedFiles = fileList.parallelStream()
    .map(FileSystems.getDefault()::getPath) // convert strings to Paths
    .collect(Collectors.toConcurrentMap(
       path -> {
          try {
             return ByteBuffer.wrap(Files.readAllBytes(path)),
             // read out the file contents and wrap in a ByteBuffer
             // which is a suitable key for a hash map
          } catch (IOException e) {
            throw new RuntimeException(e);
          }
        },
       path -> path.toString(), // in the values, convert back to string
       (first, second) -> first) // resolve duplicates by choosing arbitrarily
    .values();

这就是 整个 的事情:它同时读取所有文件,对它们进行哈希处理(尽管使用未指定的哈希算法可能不是 很好), 对它们进行重复数据删除,并吐出具有不同内容的文件列表。

如果您使用的是 Java 7,那么我会这样做。

 CompletionService<Void> service = new ExecutorCompletionService<>(
     Executors.newFixedThreadPool(4));
 final ConcurrentMap<ByteBuffer, String> unique = new ConcurrentHashMap<>();
 for (final String file : fileList) {
    service.submit(new Runnable() {
      @Override public void run() {
        try {
          ByteBuffer buffer = ByteBuffer.wrap(Files.readAllBytes(
              FileSystem.getDefault().getPath(file)));
          unique.putIfAbsent(buffer, file);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }, null);
 }
 for (int i = 0; i < fileList.size(); i++) {
   service.take();
 }
 Collection<String> result = unique.values();