Java Producer Consumer模型总是死锁
Java Producer Consumer model always deadlocking
所以我有这个任务来制作一个生产者消费者模型作为作业,我完成了一个非常粗糙的版本(但我目前的 java 技能可以做到最好)。
它似乎有效,但它 运行 陷入了死锁问题 http://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
Wiki link 描述的,基本上是出于某种原因最终所有线程都进入睡眠状态并且无法相互唤醒进入永恒的睡眠周期。
我不太确定我的代码中到底是什么导致了这种情况,因为我认为我编写它的方式不会发生这种情况,但话又说回来,我仍然不能 100% 理解线程是如何工作的.
这是我的代码:
package boundedbuffer;
import java.util.LinkedList;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Queue;
public class BoundedBuffer {
public static int CapacityCheck = 0;
public static void main(String[] args){
MessageQueue queue = new MessageQueue(3); // <-- max capacity of queue is given here as 3
Thread t1 = new Thread(new Producer(queue));
Thread t2 = new Thread(new Producer(queue));
Thread t3 = new Thread(new Producer(queue));
Thread t4 = new Thread(new Consumer(queue));
Thread t5 = new Thread(new Consumer(queue));
Thread t6 = new Thread(new Consumer(queue));
t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
t6.start();
}
}
public class Producer implements Runnable{
private MessageQueue queue;
private static String msgs[] = {
"some test message",
"long message",
"short message",
"yet another message"
};
public Producer(MessageQueue queue){
this.queue = queue;
}
@Override
public synchronized void run() {
while(true){
Random rand = new Random();
int wait = rand.nextInt(3000);
int index = rand.nextInt(4);
try {
Thread.sleep(wait);
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE,
null, ex);
}
if(BoundedBuffer.CapacityCheck < queue.capacity){
System.out.println("Puts into buffer: " + msgs[index]);
queue.put(msgs[index]);
BoundedBuffer.CapacityCheck++;
notifyAll();
}else{
try {
wait();
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
}
public class Consumer implements Runnable{
private MessageQueue queue;
public Consumer(MessageQueue queue){
this.queue = queue;
}
@Override
public synchronized void run() {
while(true){
Random rand = new Random();
int wait = rand.nextInt(3000);
try {
Thread.sleep(wait);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
String msg = queue.get();
if(msg == null){
try {
wait();
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
queue.get();
BoundedBuffer.CapacityCheck--;
System.out.println("Takes out of buffer: " + msg);
notifyAll();
}
}
}
public class MessageQueue {
public final int capacity;
private final Queue<String> messages = new LinkedList<>();
public MessageQueue(int capacity) {
this.capacity = capacity;
}
public void put(String msg){
this.messages.add(msg);
}
public String get(){
if(messages.isEmpty()){
return null;
}else{
String msg = messages.element();
messages.remove();
return msg;
}
}
}
另一个小但有趣的问题是,我从来没有或者可能只见过一次 "taking an item out" 发生不止一次的情况。每次放入物品总是一次、两次或最多三次(我在这个例子中将缓冲区大小设置为 3,所以它不会发生 4 次)但取出一个物品可能只发生一次,然后再发生一次总是放一个回去,拿出一个,放一个回去。我从来没有见过 3 件物品放入后:例如取出一件,再取出一件。
这可能是个问题或错误。同上。
我还认为在 运行 方法上使用 Synchronized 感觉有点不对劲,但如果我将其删除,则会出现 IllegalMonitorState 异常。
我正在使用多个生产者和多个消费者,因为我的老师要求我们这样做。
所有线程停顿都是因为您正在获取传递给线程的不同生产者和消费者的互斥。
您在 运行 方法上进行同步,这意味着在调用 wait
方法并进入阻塞状态时获取不同对象上的互斥锁,假设有人会通知线程返回。即使其他线程通知他们通知这个(单独的生产者或消费者)实例而不是生产者和消费者之间的共享实例。
像您一样共享公共实例 MessageQueue
并在队列上同步而不是在 run
方法上同步。
所以我有这个任务来制作一个生产者消费者模型作为作业,我完成了一个非常粗糙的版本(但我目前的 java 技能可以做到最好)。
它似乎有效,但它 运行 陷入了死锁问题 http://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem Wiki link 描述的,基本上是出于某种原因最终所有线程都进入睡眠状态并且无法相互唤醒进入永恒的睡眠周期。
我不太确定我的代码中到底是什么导致了这种情况,因为我认为我编写它的方式不会发生这种情况,但话又说回来,我仍然不能 100% 理解线程是如何工作的.
这是我的代码:
package boundedbuffer;
import java.util.LinkedList;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Queue;
public class BoundedBuffer {
public static int CapacityCheck = 0;
public static void main(String[] args){
MessageQueue queue = new MessageQueue(3); // <-- max capacity of queue is given here as 3
Thread t1 = new Thread(new Producer(queue));
Thread t2 = new Thread(new Producer(queue));
Thread t3 = new Thread(new Producer(queue));
Thread t4 = new Thread(new Consumer(queue));
Thread t5 = new Thread(new Consumer(queue));
Thread t6 = new Thread(new Consumer(queue));
t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
t6.start();
}
}
public class Producer implements Runnable{
private MessageQueue queue;
private static String msgs[] = {
"some test message",
"long message",
"short message",
"yet another message"
};
public Producer(MessageQueue queue){
this.queue = queue;
}
@Override
public synchronized void run() {
while(true){
Random rand = new Random();
int wait = rand.nextInt(3000);
int index = rand.nextInt(4);
try {
Thread.sleep(wait);
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE,
null, ex);
}
if(BoundedBuffer.CapacityCheck < queue.capacity){
System.out.println("Puts into buffer: " + msgs[index]);
queue.put(msgs[index]);
BoundedBuffer.CapacityCheck++;
notifyAll();
}else{
try {
wait();
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
}
public class Consumer implements Runnable{
private MessageQueue queue;
public Consumer(MessageQueue queue){
this.queue = queue;
}
@Override
public synchronized void run() {
while(true){
Random rand = new Random();
int wait = rand.nextInt(3000);
try {
Thread.sleep(wait);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
String msg = queue.get();
if(msg == null){
try {
wait();
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
queue.get();
BoundedBuffer.CapacityCheck--;
System.out.println("Takes out of buffer: " + msg);
notifyAll();
}
}
}
public class MessageQueue {
public final int capacity;
private final Queue<String> messages = new LinkedList<>();
public MessageQueue(int capacity) {
this.capacity = capacity;
}
public void put(String msg){
this.messages.add(msg);
}
public String get(){
if(messages.isEmpty()){
return null;
}else{
String msg = messages.element();
messages.remove();
return msg;
}
}
}
另一个小但有趣的问题是,我从来没有或者可能只见过一次 "taking an item out" 发生不止一次的情况。每次放入物品总是一次、两次或最多三次(我在这个例子中将缓冲区大小设置为 3,所以它不会发生 4 次)但取出一个物品可能只发生一次,然后再发生一次总是放一个回去,拿出一个,放一个回去。我从来没有见过 3 件物品放入后:例如取出一件,再取出一件。
这可能是个问题或错误。同上。
我还认为在 运行 方法上使用 Synchronized 感觉有点不对劲,但如果我将其删除,则会出现 IllegalMonitorState 异常。
我正在使用多个生产者和多个消费者,因为我的老师要求我们这样做。
所有线程停顿都是因为您正在获取传递给线程的不同生产者和消费者的互斥。
您在 运行 方法上进行同步,这意味着在调用 wait
方法并进入阻塞状态时获取不同对象上的互斥锁,假设有人会通知线程返回。即使其他线程通知他们通知这个(单独的生产者或消费者)实例而不是生产者和消费者之间的共享实例。
像您一样共享公共实例 MessageQueue
并在队列上同步而不是在 run
方法上同步。