ReentrantLock 线程随机终止
ReentrantLock threads terminating randomly
我一直在做一项关于 Java 中多线程的学校作业。我卡住的任务之一是我们需要在不同的组中创建多个线程,当每个组中有4个线程时,才可以释放它们以协同工作,否则它们必须放在hold/waiting。例如:
- 线程a,b,c加入7组,都放在hold/waiting.
- 线程 d 加入组 7,所有四个线程 (a、b、c、d) 都收到终止信号。
- 线程 e,f,g,h,i 加入组 8,在这种情况下,当线程 i 处于等待状态时,e,f,g,h 将被终止。
- 线程j加入7组,等待
这是我完成的一般任务。我正在处理的任务要求我们释放一组的 INITIAL 前 4 个线程,其余的应该等到前面的 4 个线程调用了 finished()。
例如,3个线程加入组65,它们被置于等待状态。另一个线程加入组 65,所有 4 个线程一起释放。现在有 4 个线程正在工作(已终止)。现在线程 e,f,g,h,i,j,k,l 加入 65 组。所有线程都等待 e,f,g,h 调用 finished() 方法。
这是我到目前为止所做的:
ExtrinsicSync.java:
import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;
public class ExtrinsicSync {
private HashMap<Integer, ConditionWrapper> groupThreadCount;
private ReentrantLock monitor;
private int count = 0;
ExtrinsicSync() {
groupThreadCount = new HashMap<>();
monitor = new ReentrantLock();
}
@Override
public void waitForThreadsInGroup(int groupId) {
monitor.lock();
if (!groupThreadCount.containsKey(groupId))
groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
condWrapper.setValue(condWrapper.getValue() + 1);
if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
{
condWrapper.getCondition().signalAll();
condWrapper.setInitialStatus(false);
System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
} else {
System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
try { condWrapper.getCondition().await(); }
catch (InterruptedException e) { e.printStackTrace(); }
}
monitor.unlock();
}
@Override
public void finished(int groupId) {
monitor.lock();
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
if(!condWrapper.getInitialStatus())
{
condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
if(condWrapper.getFinishedCount() == 4)
{
condWrapper.setFinishedCount(0);
condWrapper.getCondition().signalAll();
System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
}
}
monitor.unlock();
}
ExtrinsicSyncTest.java:
import org.junit.Test;
import java.util.EnumMap;
class TestTask1 implements Runnable{
final int group;
final ExtrinsicSync s1;
TestTask1(int group, ExtrinsicSync s1)
{
this.group = group;
this.s1 = s1;
}
public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}
public class ExtrinsicSyncTest {
@Test
public void testPhaseThreethreads() {
int nThreads = 22;
Thread t[] = new Thread[nThreads];
final ExtrinsicSync s1 = new ExtrinsicSync();
for(int i = 0; i < nThreads/2; i++)
(t[i] = new Thread(new TestTask1(66, s1))).start();
for(int i = nThreads/2; i < nThreads; i++)
(t[i] = new Thread(new TestTask1(70, s1))).start();
for (Thread ti : t)
{
try { ti.join(100); }
catch (Exception e) { System.out.println(e); }
}
EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);
for (Thread.State s : Thread.State.values())
threadsInThisState.put(s, 0);
for (Thread ti : t)
{
Thread.State state = ti.getState();
int n = threadsInThisState.get(state);
threadsInThisState.put(state, n + 1);
}
System.out.println("threadsInThisState: " + threadsInThisState.toString() );
}
}
ConditionWrapper.java:
import java.util.concurrent.locks.Condition;
public class ConditionWrapper {
private Condition cond;
private Integer value;
private Integer finishedCount;
private boolean initialThreads;
public ConditionWrapper(Condition condition)
{
this.cond = condition;
this.value = 0;
this.finishedCount = 0;
this.initialThreads = true;
}
// Returns the condition object of current request
public Condition getCondition()
{
return this.cond;
}
// Gets the current counter of threads waiting in this queue.
public Integer getValue()
{
return this.value;
}
// Sets the given value. Used for resetting the counter.
public void setValue(int value) { this.value = value; }
// Sets the counter to help keep track of threads which called finished() method
public void setFinishedCount(int count) { this.finishedCount = count; }
// Gets the finished count.
public Integer getFinishedCount() { return this.finishedCount; }
// This flag is to identify initial threads of a group
public boolean getInitialStatus() { return initialThreads; }
public void setInitialStatus(boolean val) { this.initialThreads = val; }
}
我遇到的问题是我能够释放每个组的前四个线程,但不知何故,某处有 2 个线程被随机终止,我无法弄清楚发生了什么。例如上面22个线程的测试用例分成两组,只有8个线程应该终止,其余线程等待。
但是这里有 10 个线程被终止了。我不明白这是怎么回事。我已尽可能将代码精简到最低限度。
问题在于,对于非初始线程 (getInitialStatus==false),您不会向其他线程发出信号,但当您达到其中四个时,您仍然会终止它们。所以这就是发生的事情:
- 前三个线程增加计数并等待
- 第四个线程达到 count == 4 并设置 initial = false 并向所有其他线程发出信号并将计数设置为零
- 接下来的三个线程将计数增加一个
- 8 个线程达到计数 == 4 并被终止。由于 getInitialStatus==false 此线程不会通知其他线程。
所以 4*2 个线程 + 2 个线程被终止。正是您在测试中看到的计数。
这里有一个可能的实现方式:
- 在每个线程或任务中使用标志 canExecute
- 使用方法 calculateState 计算当前状态,如果允许线程执行,则将标志设置为 true。
- 将所有等待的线程存储在列表或类似的东西中
因此您的任务将如下所示:
Task
boolean canExeute
方法 waitForThreadsInGroup 看起来像这样:
waitForThreadsInGroup
monitor.lock();
add task to list
calculateTaskState
condition.notifyAll
while( ! task.canExcecute )
{
condition.await.
}
monitor.unlock();
完成方法看起来很相似:
finish
monitor.lock();
decrement finish count
calculateTaskState
condition.notifyAll
monitor.unlock();
并计算TaskState
calculateTaskState
if( finishCount == 0)
{
if( taskList.size >= 4 )
{
set 4 tasks in this list to can execute and remove them from the list
}
}
所以诀窍是将逻辑分为三个步骤:
- 操作,例如减少完成计数
- 新状态的计算。并决定每个线程是否允许执行
- 以及线程的等待。每个线程都需要等待自己的标志
我一直在做一项关于 Java 中多线程的学校作业。我卡住的任务之一是我们需要在不同的组中创建多个线程,当每个组中有4个线程时,才可以释放它们以协同工作,否则它们必须放在hold/waiting。例如:
- 线程a,b,c加入7组,都放在hold/waiting.
- 线程 d 加入组 7,所有四个线程 (a、b、c、d) 都收到终止信号。
- 线程 e,f,g,h,i 加入组 8,在这种情况下,当线程 i 处于等待状态时,e,f,g,h 将被终止。
- 线程j加入7组,等待
这是我完成的一般任务。我正在处理的任务要求我们释放一组的 INITIAL 前 4 个线程,其余的应该等到前面的 4 个线程调用了 finished()。
例如,3个线程加入组65,它们被置于等待状态。另一个线程加入组 65,所有 4 个线程一起释放。现在有 4 个线程正在工作(已终止)。现在线程 e,f,g,h,i,j,k,l 加入 65 组。所有线程都等待 e,f,g,h 调用 finished() 方法。
这是我到目前为止所做的:
ExtrinsicSync.java:
import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;
public class ExtrinsicSync {
private HashMap<Integer, ConditionWrapper> groupThreadCount;
private ReentrantLock monitor;
private int count = 0;
ExtrinsicSync() {
groupThreadCount = new HashMap<>();
monitor = new ReentrantLock();
}
@Override
public void waitForThreadsInGroup(int groupId) {
monitor.lock();
if (!groupThreadCount.containsKey(groupId))
groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
condWrapper.setValue(condWrapper.getValue() + 1);
if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
{
condWrapper.getCondition().signalAll();
condWrapper.setInitialStatus(false);
System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
} else {
System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
try { condWrapper.getCondition().await(); }
catch (InterruptedException e) { e.printStackTrace(); }
}
monitor.unlock();
}
@Override
public void finished(int groupId) {
monitor.lock();
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
if(!condWrapper.getInitialStatus())
{
condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
if(condWrapper.getFinishedCount() == 4)
{
condWrapper.setFinishedCount(0);
condWrapper.getCondition().signalAll();
System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
}
}
monitor.unlock();
}
ExtrinsicSyncTest.java:
import org.junit.Test;
import java.util.EnumMap;
class TestTask1 implements Runnable{
final int group;
final ExtrinsicSync s1;
TestTask1(int group, ExtrinsicSync s1)
{
this.group = group;
this.s1 = s1;
}
public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}
public class ExtrinsicSyncTest {
@Test
public void testPhaseThreethreads() {
int nThreads = 22;
Thread t[] = new Thread[nThreads];
final ExtrinsicSync s1 = new ExtrinsicSync();
for(int i = 0; i < nThreads/2; i++)
(t[i] = new Thread(new TestTask1(66, s1))).start();
for(int i = nThreads/2; i < nThreads; i++)
(t[i] = new Thread(new TestTask1(70, s1))).start();
for (Thread ti : t)
{
try { ti.join(100); }
catch (Exception e) { System.out.println(e); }
}
EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);
for (Thread.State s : Thread.State.values())
threadsInThisState.put(s, 0);
for (Thread ti : t)
{
Thread.State state = ti.getState();
int n = threadsInThisState.get(state);
threadsInThisState.put(state, n + 1);
}
System.out.println("threadsInThisState: " + threadsInThisState.toString() );
}
}
ConditionWrapper.java:
import java.util.concurrent.locks.Condition;
public class ConditionWrapper {
private Condition cond;
private Integer value;
private Integer finishedCount;
private boolean initialThreads;
public ConditionWrapper(Condition condition)
{
this.cond = condition;
this.value = 0;
this.finishedCount = 0;
this.initialThreads = true;
}
// Returns the condition object of current request
public Condition getCondition()
{
return this.cond;
}
// Gets the current counter of threads waiting in this queue.
public Integer getValue()
{
return this.value;
}
// Sets the given value. Used for resetting the counter.
public void setValue(int value) { this.value = value; }
// Sets the counter to help keep track of threads which called finished() method
public void setFinishedCount(int count) { this.finishedCount = count; }
// Gets the finished count.
public Integer getFinishedCount() { return this.finishedCount; }
// This flag is to identify initial threads of a group
public boolean getInitialStatus() { return initialThreads; }
public void setInitialStatus(boolean val) { this.initialThreads = val; }
}
我遇到的问题是我能够释放每个组的前四个线程,但不知何故,某处有 2 个线程被随机终止,我无法弄清楚发生了什么。例如上面22个线程的测试用例分成两组,只有8个线程应该终止,其余线程等待。
但是这里有 10 个线程被终止了。我不明白这是怎么回事。我已尽可能将代码精简到最低限度。
问题在于,对于非初始线程 (getInitialStatus==false),您不会向其他线程发出信号,但当您达到其中四个时,您仍然会终止它们。所以这就是发生的事情:
- 前三个线程增加计数并等待
- 第四个线程达到 count == 4 并设置 initial = false 并向所有其他线程发出信号并将计数设置为零
- 接下来的三个线程将计数增加一个
- 8 个线程达到计数 == 4 并被终止。由于 getInitialStatus==false 此线程不会通知其他线程。
所以 4*2 个线程 + 2 个线程被终止。正是您在测试中看到的计数。
这里有一个可能的实现方式:
- 在每个线程或任务中使用标志 canExecute
- 使用方法 calculateState 计算当前状态,如果允许线程执行,则将标志设置为 true。
- 将所有等待的线程存储在列表或类似的东西中
因此您的任务将如下所示:
Task
boolean canExeute
方法 waitForThreadsInGroup 看起来像这样:
waitForThreadsInGroup
monitor.lock();
add task to list
calculateTaskState
condition.notifyAll
while( ! task.canExcecute )
{
condition.await.
}
monitor.unlock();
完成方法看起来很相似:
finish
monitor.lock();
decrement finish count
calculateTaskState
condition.notifyAll
monitor.unlock();
并计算TaskState
calculateTaskState
if( finishCount == 0)
{
if( taskList.size >= 4 )
{
set 4 tasks in this list to can execute and remove them from the list
}
}
所以诀窍是将逻辑分为三个步骤:
- 操作,例如减少完成计数
- 新状态的计算。并决定每个线程是否允许执行
- 以及线程的等待。每个线程都需要等待自己的标志