Java 多线程:线程应访问列表

Java Multithreading: threads shall access list

我有 运行 五个线程,并且我有一个对象列表(我独立于线程对其进行了初始化)。 列表中的对象使用布尔值作为标志,所以我知道它们是否已经被另一个线程处理过。此外,我的线程的“ID”有一个整数(所以你知道哪个线程当前正在工作)。

问题:第一个处理 for 循环的线程将处理列表中的所有对象,但我希望线程交替。我做错了什么?

运行() 方法类似于:

void run() {
    for (int i = 0; i < list.size(); i++) {
        ListObject currentObject = list.get(i);
        synchronized (currentObject) {
            if (currentObject.getHandled == false) {
                currentObject.setHandled(true);
                System.out.println("Object is handled by " + this.getID());
            } else {
                continue;
            }
        }
    }
}

TL;DR 在线程之间显式或隐式划分列表;和同步,如果真的需要的话;

The problem: The first thread that gets a hand on the for-loop will handle all objects in the list, but i want the threads to alternate. What am I doing wrong?

整个代码块都是可以预料的

  for (int i = 0; i < list.size(); i++) {
        ListObject currentObject = list.get(i);
        synchronized (currentObject) {
            ....
        }
    }

基本上 顺序执行 因为每个线程在每次迭代中使用对象 currentObject 隐式锁进行同步。所有五个线程都进入 run 方法,但是其中一个在 synchronized (currentObject) 中首先进入,所有其他线程将依次等待第一个线程释放 currentObject 隐式锁。当线程完成后继续下一次迭代,而其余线程仍在上一次迭代中。因此,第一个进入 synchronized (currentObject) 的线程将有一个领先的开始,并且将成为前一个线程的步长,并且可能会计算所有剩余的迭代。因此:

The first thread that gets a hand on the for-loop will handle all objects in the list,

按顺序执行代码在性能和可读性方面会更好。

假设

我假设

  1. 存储在列表中的对象不会在这些线程遍历列表的同时在其他地方被访问;
  2. 该列表不包含对同一对象的多个引用;

我建议,与其让每个线程都遍历整个列表并在每次迭代中进行同步——这是非常不执行的,实际上破坏了并行性——每个线程都将计算列表的不同块( 例如, 在线程之间划分 for 循环的迭代)。例如:

方法一:使用并行流

如果您不必显式并行化代码,请考虑使用 ParallelStream:

list.parallelStream().forEach(this::setHandled);
   
private void setHandled(ListObject currentObject) {
    if (!currentObject.getHandled) {
        currentObject.setHandled(true);
        System.out.println("Object is handled by " + this.getID());
    }
}

方法 2:如果您必须使用执行程序显式并行化代码

I'm running five threads,

(如 ernest_k 所示)

 ExecutorService ex = Executors.newFixedThreadPool(5);
 for (ListObject l : list)
     ex.submit(() -> setHandled(l));
 ...

private void setHandled(ListObject currentObject) {
    if (!currentObject.getHandled) {
        currentObject.setHandled(true);
        System.out.println("Object is handled by " + this.getID());
    }
}

方法 3:如果必须显式使用线程

void run() {
    for (int i = threadID; i < list.size(); i += total_threads) {
        ListObject currentObject = list.get(i);
        if (currentObject.getHandled == false) {
           currentObject.setHandled(true);
           System.out.println("Object is handled by " + this.getID());
       }
    }
}

在这种方法中,我以循环方式在线程之间拆分 for 循环的迭代,假设 total_threads 是线程的数量将计算 run 方法,并且每个线程将有一个唯一的 threadID,范围从 0total_threads - 1。在线程之间分配迭代的其他方法也是可见的,例如在线程之间动态分配迭代:

void run() {
    for (int i = task.getAndIncrement(); i < list.size(); i = task.getAndIncrement();) {
        ListObject currentObject = list.get(i);
        if (currentObject.getHandled == false) {
           currentObject.setHandled(true);
           System.out.println("Object is handled by " + this.getID());
       }
    }
}

其中 task 将是一个原子整数(即 AtomicInteger task = new AtomicInteger();)。

在所有方法中,想法都是相同的将列表的不同块分配给线程,以便这些线程可以彼此独立地执行这些块。


如果无法做出假设 1. 和 2.,那么您仍然可以应用上述在线程之间拆分迭代的逻辑,但您需要添加同步,在我的示例中到以下代码块:

  private void setHandled(ListObject currentObject) {
        if (!currentObject.getHandled) {
            currentObject.setHandled(true);
            System.out.println("Object is handled by " + this.getID());
        }
    }

实际上,您只需将 currentObject 字段转换为 AtomicBoolean,如下所示:

private void setHandled(ListObject currentObject) {
    if (currentObject.getHandled.compareAndSet(false, true)) {
        System.out.println("Object is handled by " + this.getID());
    }
}

否则使用同步子句:

private void setHandled(ListObject currentObject) {
        synchronized (currentObject) {
            if (!currentObject.getHandled) {
                currentObject.setHandled(true);
                System.out.println("Object is handled by " + this.getID());
            }
       }
 }