三线程通信与同步
Three Threads Comnunition and Synchronize
背景:我有三个线程。 ThreadA负责向队列中写入元素,如果队列已满,则通知ThreadC从队列中读取元素。 ThreadB是另一种情况,如果队列没有满,但是时间超过5秒,那么Thread通知threadC从队列中取元素,最后,ThreadC通知ThreadB刷新它的时间戳。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private Lock lock = new ReentrantLock();
private Condition conA = lock.newCondition();
private Condition conB = lock.newCondition();
private Condition conC = lock.newCondition();
ArrayBlockingQueue<Integer> readQueueA = new ArrayBlockingQueue<>(3);
public static void main(String[] args) {
Main main1 = new Main();
try {
ThreadA threadWrite = main1.new ThreadA();
Thread threadOut = new Thread(threadWrite);
threadOut.start();
ThreadB threadB = main1.new ThreadB();
Thread threadBB = new Thread(threadB);
threadBB.start();
ThreadC threadRead = main1.new ThreadC();
Thread threadIn = new Thread(threadRead);
threadIn.start();
} catch (Exception ex) {
ex.printStackTrace();
}
}
class ThreadA implements Runnable {
public void addElement(Integer i) {
try {
readQueueA.put(i);
} catch (Exception ex ){
ex.printStackTrace();
}
}
@Override
public void run() {
int i = 0;
while (i < 10) {
lock.lock();
try {
if (readQueueA.size() < 3) {
addElement(i++);
} else {
System.out.println("notice C: " + new Date());
conC.signal();
conA.await();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class ThreadB implements Runnable {
@Override
public void run() {
reSetStartTime();
while (true) {
lock.lock();
try {
if (!conB.await(5, TimeUnit.SECONDS)) {
System.out.println("Timeout Zzzzzzz: " + new Date());
conC.signal();
} else {
System.out.println("RefreshB。。。。" + new Date());
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class ThreadC implements Runnable {
public void printQueue(ArrayBlockingQueue<Integer> printQueue) {
int len = printQueue.size();
System.out.println("Queen Size :" + printQueue.size());
for (int i = 0; i < len; i++) {
System.out.print(printQueue.poll());
}
System.out.println();
}
@Override
public void run() {
while (true) {
lock.lock();
try {
System.out.println("I'm thread C " + new Date());
conC.await();
System.out.println("I'm thread C, and I wake up " + new Date());
if (readQueueA.size() > 0) {
System.out.print("OUTPUT: ");
printQueue(readQueueA);
conA.signal();
conB.signal();
} else {
System.out.println("No elements");
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
输出:
"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe"
notice C: Fri Sep 21 10:07:35 CST 2018
I'm thread C Fri Sep 21 10:07:35 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
012
I'm thread C Fri Sep 21 10:07:40 CST 2018
notice C: Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
345
I'm thread C Fri Sep 21 10:07:40 CST 2018
notice C: Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
678
I'm thread C Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:45 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:45 CST 2018
OUTPUT: Queen Size :1
9
I'm thread C Fri Sep 21 10:07:45 CST 2018
RefreshB。。。。Fri Sep 21 10:07:45 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:50 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:50 CST 2018
No elements
....
但是,输出不是我预期的,为什么一开始ThreadB outPut TimeOut Zzz?我觉得应该不会输出这个item.Because ThreadA wakeup ThreadC, The下一步应该执行 ThreadC 和 Thread 可以 运行 在短时间内完成而不超过 5s.Can 有人帮我解释或修复它吗?谢谢!
预期:
notice C: Fri Sep 21 10:28:31 CST 2018
I'm thread C Fri Sep 21 10:28:**31** CST 2018
I'm thread C, and I wake up Fri Sep 21 10:28:31 CST 2018
OUTPUT: Queen Size :3
012
I'm thread C Fri Sep 21 10:28:31 CST 2018
notice C: Fri Sep 21 10:28:31 CST 2018
RefreshB。。。。Fri Sep 21 10:28:31 CST 2018
...
以下是您发布的特定场景中发生的情况:A 启动,并运行 while 循环,直到它向 C 发出信号并等待。然后C启动等待,问题就出现在这里; A signaled before C开始等待,所以那个signal调用就丢失了,现在A和C都在等待。所以此时,控制台上显示的是
通知 C ...
我是线程 C...
现在 B 启动,并等待整整 5 秒,因为没有其他线程可用于向它发出信号。因此,conB.await(5, TimeUnit.SECONDS)
returns false,它打印 Timeout Zzzzzzz:
,然后 then 向 C 发出信号。这就是为什么事情看起来不正常,只是一个信号当另一个线程已经正在等待时生效!
要解决此问题,请尝试更改
conC.await();
System.out.println("I'm thread C, and I wake up " + new Date());
至
if (readQueueA.size() < 3) {
conC.await();
System.out.println("I'm thread C, and I wake up " + new Date());
}
这样,如果它正在等待的条件已经满足,C 就不会等待。
背景:我有三个线程。 ThreadA负责向队列中写入元素,如果队列已满,则通知ThreadC从队列中读取元素。 ThreadB是另一种情况,如果队列没有满,但是时间超过5秒,那么Thread通知threadC从队列中取元素,最后,ThreadC通知ThreadB刷新它的时间戳。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private Lock lock = new ReentrantLock();
private Condition conA = lock.newCondition();
private Condition conB = lock.newCondition();
private Condition conC = lock.newCondition();
ArrayBlockingQueue<Integer> readQueueA = new ArrayBlockingQueue<>(3);
public static void main(String[] args) {
Main main1 = new Main();
try {
ThreadA threadWrite = main1.new ThreadA();
Thread threadOut = new Thread(threadWrite);
threadOut.start();
ThreadB threadB = main1.new ThreadB();
Thread threadBB = new Thread(threadB);
threadBB.start();
ThreadC threadRead = main1.new ThreadC();
Thread threadIn = new Thread(threadRead);
threadIn.start();
} catch (Exception ex) {
ex.printStackTrace();
}
}
class ThreadA implements Runnable {
public void addElement(Integer i) {
try {
readQueueA.put(i);
} catch (Exception ex ){
ex.printStackTrace();
}
}
@Override
public void run() {
int i = 0;
while (i < 10) {
lock.lock();
try {
if (readQueueA.size() < 3) {
addElement(i++);
} else {
System.out.println("notice C: " + new Date());
conC.signal();
conA.await();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class ThreadB implements Runnable {
@Override
public void run() {
reSetStartTime();
while (true) {
lock.lock();
try {
if (!conB.await(5, TimeUnit.SECONDS)) {
System.out.println("Timeout Zzzzzzz: " + new Date());
conC.signal();
} else {
System.out.println("RefreshB。。。。" + new Date());
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class ThreadC implements Runnable {
public void printQueue(ArrayBlockingQueue<Integer> printQueue) {
int len = printQueue.size();
System.out.println("Queen Size :" + printQueue.size());
for (int i = 0; i < len; i++) {
System.out.print(printQueue.poll());
}
System.out.println();
}
@Override
public void run() {
while (true) {
lock.lock();
try {
System.out.println("I'm thread C " + new Date());
conC.await();
System.out.println("I'm thread C, and I wake up " + new Date());
if (readQueueA.size() > 0) {
System.out.print("OUTPUT: ");
printQueue(readQueueA);
conA.signal();
conB.signal();
} else {
System.out.println("No elements");
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
输出:
"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe"
notice C: Fri Sep 21 10:07:35 CST 2018
I'm thread C Fri Sep 21 10:07:35 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
012
I'm thread C Fri Sep 21 10:07:40 CST 2018
notice C: Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
345
I'm thread C Fri Sep 21 10:07:40 CST 2018
notice C: Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:40 CST 2018
OUTPUT: Queen Size :3
678
I'm thread C Fri Sep 21 10:07:40 CST 2018
RefreshB。。。。Fri Sep 21 10:07:40 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:45 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:45 CST 2018
OUTPUT: Queen Size :1
9
I'm thread C Fri Sep 21 10:07:45 CST 2018
RefreshB。。。。Fri Sep 21 10:07:45 CST 2018
TimeOut Zzzzzzz: Fri Sep 21 10:07:50 CST 2018
I'm thread C, and I wake up Fri Sep 21 10:07:50 CST 2018
No elements
....
但是,输出不是我预期的,为什么一开始ThreadB outPut TimeOut Zzz?我觉得应该不会输出这个item.Because ThreadA wakeup ThreadC, The下一步应该执行 ThreadC 和 Thread 可以 运行 在短时间内完成而不超过 5s.Can 有人帮我解释或修复它吗?谢谢!
预期:
notice C: Fri Sep 21 10:28:31 CST 2018
I'm thread C Fri Sep 21 10:28:**31** CST 2018
I'm thread C, and I wake up Fri Sep 21 10:28:31 CST 2018
OUTPUT: Queen Size :3
012
I'm thread C Fri Sep 21 10:28:31 CST 2018
notice C: Fri Sep 21 10:28:31 CST 2018
RefreshB。。。。Fri Sep 21 10:28:31 CST 2018
...
以下是您发布的特定场景中发生的情况:A 启动,并运行 while 循环,直到它向 C 发出信号并等待。然后C启动等待,问题就出现在这里; A signaled before C开始等待,所以那个signal调用就丢失了,现在A和C都在等待。所以此时,控制台上显示的是
通知 C ...
我是线程 C...
现在 B 启动,并等待整整 5 秒,因为没有其他线程可用于向它发出信号。因此,conB.await(5, TimeUnit.SECONDS)
returns false,它打印 Timeout Zzzzzzz:
,然后 then 向 C 发出信号。这就是为什么事情看起来不正常,只是一个信号当另一个线程已经正在等待时生效!
要解决此问题,请尝试更改
conC.await();
System.out.println("I'm thread C, and I wake up " + new Date());
至
if (readQueueA.size() < 3) {
conC.await();
System.out.println("I'm thread C, and I wake up " + new Date());
}
这样,如果它正在等待的条件已经满足,C 就不会等待。