ReentrantLock 线程随机终止

ReentrantLock threads terminating randomly

我一直在做一项关于 Java 中多线程的学校作业。我卡住的任务之一是我们需要在不同的组中创建多个线程,当每个组中有4个线程时,才可以释放它们以协同工作,否则它们必须放在hold/waiting。例如:

这是我完成的一般任务。我正在处理的任务要求我们释放一组的 INITIAL 前 4 个线程,其余的应该等到前面的 4 个线程调用了 finished()。

例如,3个线程加入组65,它们被置于等待状态。另一个线程加入组 65,所有 4 个线程一起释放。现在有 4 个线程正在工作(已终止)。现在线程 e,f,g,h,i,j,k,l 加入 65 组。所有线程都等待 e,f,g,h 调用 finished() 方法。

这是我到目前为止所做的:

ExtrinsicSync.java:

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;

public class ExtrinsicSync {

    private HashMap<Integer, ConditionWrapper> groupThreadCount;
    private ReentrantLock monitor;
    private int count = 0;

    ExtrinsicSync() {
        groupThreadCount = new HashMap<>();
        monitor = new ReentrantLock();
    }

@Override
public void waitForThreadsInGroup(int groupId) {
    monitor.lock();

    if (!groupThreadCount.containsKey(groupId))
        groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));

    ConditionWrapper condWrapper = groupThreadCount.get(groupId);
    condWrapper.setValue(condWrapper.getValue() + 1);

    if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
    {
        condWrapper.getCondition().signalAll();
        condWrapper.setInitialStatus(false);

        System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
    } else {
        System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
        try { condWrapper.getCondition().await(); }
        catch (InterruptedException e) { e.printStackTrace(); }

    }

    monitor.unlock();
}

@Override
public void finished(int groupId) {
    monitor.lock();
    ConditionWrapper condWrapper = groupThreadCount.get(groupId);

    if(!condWrapper.getInitialStatus())
    {
        condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
        System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
        if(condWrapper.getFinishedCount() == 4)
        {
            condWrapper.setFinishedCount(0);
            condWrapper.getCondition().signalAll();
            System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
        }
    }
    monitor.unlock();
}

ExtrinsicSyncTest.java:

import org.junit.Test;

import java.util.EnumMap;

class TestTask1 implements Runnable{

    final int group;
    final ExtrinsicSync s1;

    TestTask1(int group, ExtrinsicSync s1)
    {
        this.group = group;
        this.s1 = s1;
    }

    public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}

public class ExtrinsicSyncTest {

    @Test
    public void testPhaseThreethreads() {

        int nThreads = 22;

        Thread t[] = new Thread[nThreads];
        final ExtrinsicSync s1 = new ExtrinsicSync();

        for(int i = 0; i < nThreads/2; i++)
            (t[i] = new Thread(new TestTask1(66, s1))).start();

        for(int i = nThreads/2; i < nThreads; i++)
            (t[i] = new Thread(new TestTask1(70, s1))).start();

        for (Thread ti : t)
        {
            try { ti.join(100); }
            catch (Exception e) { System.out.println(e); }
        }

        EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);

        for (Thread.State s : Thread.State.values())
            threadsInThisState.put(s, 0);

        for (Thread ti : t)
        {
            Thread.State state = ti.getState();
            int n = threadsInThisState.get(state);
            threadsInThisState.put(state, n + 1);
        }

        System.out.println("threadsInThisState: " + threadsInThisState.toString() );

    }
}

ConditionWrapper.java:

import java.util.concurrent.locks.Condition;

public class ConditionWrapper {
    private Condition cond;
    private Integer value;
    private Integer finishedCount;
    private boolean initialThreads;

    public ConditionWrapper(Condition condition)
    {
        this.cond = condition;
        this.value = 0;
        this.finishedCount = 0;
        this.initialThreads = true;
    }
    // Returns the condition object of current request
    public Condition getCondition()
    {
        return this.cond;
    }
    // Gets the current counter of threads waiting in this queue.
    public Integer getValue()
    {
        return this.value;
    }
    // Sets the given value. Used for resetting the counter.
    public void setValue(int value) { this.value = value; }
    // Sets the counter to help keep track of threads which called finished() method
    public void setFinishedCount(int count) { this.finishedCount = count; }
    // Gets the finished count.
    public Integer getFinishedCount() { return this.finishedCount; }
    // This flag is to identify initial threads of a group
    public boolean getInitialStatus() { return initialThreads; }
    public void setInitialStatus(boolean val) { this.initialThreads = val; }
}

我遇到的问题是我能够释放每个组的前四个线程,但不知何故,某处有 2 个线程被随机终止,我无法弄清楚发生了什么。例如上面22个线程的测试用例分成两组,只有8个线程应该终止,其余线程等待。

但是这里有 10 个线程被终止了。我不明白这是怎么回事。我已尽可能将代码精简到最低限度。

问题在于,对于非初始线程 (getInitialStatus==false),您不会向其他线程发出信号,但当您达到其中四个时,您仍然会终止它们。所以这就是发生的事情:

  1. 前三个线程增加计数并等待
  2. 第四个线程达到 count == 4 并设置 initial = false 并向所有其他线程发出信号并将计数设置为零
  3. 接下来的三个线程将计数增加一个
  4. 8 个线程达到计数 == 4 并被终止。由于 getInitialStatus==false 此线程不会通知其他线程。

所以 4*2 个线程 + 2 个线程被终止。正是您在测试中看到的计数。


这里有一个可能的实现方式:

  1. 在每个线程或任务中使用标志 canExecute
  2. 使用方法 calculateState 计算当前状态,如果允许线程执行,则将标志设置为 true。
  3. 将所有等待的线程存储在列表或类似的东西中

因此您的任务将如下所示:

Task
  boolean canExeute

方法 waitForThreadsInGroup 看起来像这样:

waitForThreadsInGroup
  monitor.lock();
      add task to list
      calculateTaskState
      condition.notifyAll
      while( ! task.canExcecute )
      {
        condition.await.
      }

  monitor.unlock();

完成方法看起来很相似:

  finish
    monitor.lock();
    decrement finish count
    calculateTaskState
   condition.notifyAll
   monitor.unlock();

并计算TaskState

calculateTaskState
  if( finishCount == 0)
  {
      if( taskList.size >= 4  )
      {
         set 4 tasks in this list to can execute and remove them from the list
      }
  }

所以诀窍是将逻辑分为三个步骤:

  1. 操作,例如减少完成计数
  2. 新状态的计算。并决定每个线程是否允许执行
  3. 以及线程的等待。每个线程都需要等待自己的标志