何时以及如何使用 wait() 和 notify()
When and How to use wait() and notify()
我从 SO 找到了这个例子。现在,我试图了解 wait()
和 notify()/notifyAll()
的用法。在什么情况下以及为什么我们需要这个。
class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void put(T element) throws InterruptedException {
while (queue.size() == capacity) {
System.out.println("Waiting...");
wait();
}
queue.add(element);
notify(); // notifyAll() for multiple producer/consumer threads
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
T item = queue.remove();
notify(); // notifyAll() for multiple producer/consumer threads
return item;
}
}
因此,实现了 Runnable
并重写了 run()
方法,如下所示
@Override
public void run() {
// synchronized (this) {
BlockingQueue<Integer> s = new BlockingQueue(10);
for (int i = 0; i < 12; i++) {
try {
s.put(i);
if (i > 9) {
System.out.println(Thread.currentThread().getName() + " : " + s.take());
}
System.out.println(Thread.currentThread().getName() + " ExtendsThread : Counter : " + i);
} //}
//notify();
catch (InterruptedException ex) {
Logger.getLogger(ExtendsThread.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
而且,运行 像下面这样的线程
ImplementsRunnable rc = new ImplementsRunnable();
Thread t1 = new Thread(rc, "A");
t1.start();
当我运行它时,它在counter : 9
之后就卡住了,一直等下去。有人告诉我这里有什么问题吗?
你的想法有点问题。 BlockingQueue
可以充当 producer/consumer 模式中的桥梁。
也就是说,它允许一个线程向其中写入内容,而另一个线程从中读取内容,但这样做的方式是:
- 没有物品可取,等待新物品到达
- 如果项目太多,它会等待删除项目
在这种情况下,wait
和 notify
是 BlockingQueue
实例的内部消息传递
你可以看看Intrinsic Locks and Synchronization。
因此,与其只使用一个线程,不如使用(至少)两个线程,一个生产者和一个消费者...
制作人
这需要一个 BlockingQueue
的实例并向其添加 int
值。每次停1秒再加入下一个
public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int index = 0; index < 10; index++) {
try {
System.out.println("Put " + index);
queue.put(index);
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}
}
}
消费者
消费者获取 BlockQueue
并从中读取 int
值,这些值将被阻塞,直到值存在。
public class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer value = queue.take();
System.out.println("Took " + value);
}
} catch (InterruptedException ex) {
Logger.getLogger(JavaApplication220.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
您可以开始使用类似...
BlockingQueue bq = new BlockingQueue(10);
Thread p = new Thread(new Producer(bq));
Thread c = new Thread(new Consumer(bq));
c.setDaemon(true);
c.start();
p.start();
您应该注意到 put
消息之间有一个小的延迟,但 took
消息之间几乎没有延迟。这是正在运行的队列。 Consumer
在队列中 blocking/waiting 有东西要给它。
您可以尝试 Producer
和 Consumer
,也许可以更改它们的时间(例如,在 Consumer
中有更长的延迟,然后再拿一个项目)看看这如何可能会造成不同的影响
When I'm running it, then it is stuck after counter : 9 and keep on waiting for forever
这很可能是因为您已经超出了队列的容量,并且它的 put
方法一直处于阻塞状态,直到您从中取出一些东西(您实际上有一个死锁,队列正在等待您从中获取一些东西,但你不能,因为你被锁定在 put
)
要记住的事情:
- 要使两个或多个线程使用监视器锁,它们必须共享同一个 monitor/object 锁实例。在这种情况下,
BlockingQueue
的相同实例
notify
将唤醒一个对象,该对象正在等待监视器锁的 wait
方法的同一实例。没有办法知道是哪一个。如果您有多个消费者,但不关心数据处理的顺序,这可能很有用,例如
更新了附加示例
因此,这会从 Producer
中取出 Thread.sleep
(并允许生产者生成 100 个值)并向 Consumer
添加一个 Thread.sleep
。
这样,Producer
将在 Consumer
耗尽它之前达到其容量,迫使它等待 Consumer
可以从中获取值...
public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int index = 0; index < 100; index++) {
try {
System.out.println("Put " + index);
queue.put(index);
} catch (InterruptedException ex) {
}
}
}
}
public class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer value = queue.take();
System.out.println("Took " + value);
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
}
}
}
在此处添加 printlns
public synchronized void put(T element) throws InterruptedException {
while (queue.size() == capacity) {
System.out.println("blocked");
wait();
}
queue.add(element);
notify(); // notifyAll() for multiple producer/consumer threads
System.out.println("put "+ element);
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
T item = queue.remove();
notify(); // notifyAll() for multiple producer/consumer threads
System.out.println("removed " + item);
return item;
}
和运行本次测试
public static void main(String argv[]) throws Exception {
final BlockingQueue q = new BlockingQueue(2);
new Thread() {
public void run() {
try {
Thread.sleep(5000);
q.take();
} catch (Exception e) {
e.printStackTrace();
}
};
}.start();
q.put(1);
q.put(2); // will block here until tread 2 takes an element and reduces the capacity
q.put(3);
}
它将打印
put 1
put 2
blocked
removed 1
put 3
我从 SO 找到了这个例子。现在,我试图了解 wait()
和 notify()/notifyAll()
的用法。在什么情况下以及为什么我们需要这个。
class BlockingQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void put(T element) throws InterruptedException {
while (queue.size() == capacity) {
System.out.println("Waiting...");
wait();
}
queue.add(element);
notify(); // notifyAll() for multiple producer/consumer threads
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
T item = queue.remove();
notify(); // notifyAll() for multiple producer/consumer threads
return item;
}
}
因此,实现了 Runnable
并重写了 run()
方法,如下所示
@Override
public void run() {
// synchronized (this) {
BlockingQueue<Integer> s = new BlockingQueue(10);
for (int i = 0; i < 12; i++) {
try {
s.put(i);
if (i > 9) {
System.out.println(Thread.currentThread().getName() + " : " + s.take());
}
System.out.println(Thread.currentThread().getName() + " ExtendsThread : Counter : " + i);
} //}
//notify();
catch (InterruptedException ex) {
Logger.getLogger(ExtendsThread.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
而且,运行 像下面这样的线程
ImplementsRunnable rc = new ImplementsRunnable();
Thread t1 = new Thread(rc, "A");
t1.start();
当我运行它时,它在counter : 9
之后就卡住了,一直等下去。有人告诉我这里有什么问题吗?
你的想法有点问题。 BlockingQueue
可以充当 producer/consumer 模式中的桥梁。
也就是说,它允许一个线程向其中写入内容,而另一个线程从中读取内容,但这样做的方式是:
- 没有物品可取,等待新物品到达
- 如果项目太多,它会等待删除项目
在这种情况下,wait
和 notify
是 BlockingQueue
你可以看看Intrinsic Locks and Synchronization。
因此,与其只使用一个线程,不如使用(至少)两个线程,一个生产者和一个消费者...
制作人
这需要一个 BlockingQueue
的实例并向其添加 int
值。每次停1秒再加入下一个
public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int index = 0; index < 10; index++) {
try {
System.out.println("Put " + index);
queue.put(index);
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}
}
}
消费者
消费者获取 BlockQueue
并从中读取 int
值,这些值将被阻塞,直到值存在。
public class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer value = queue.take();
System.out.println("Took " + value);
}
} catch (InterruptedException ex) {
Logger.getLogger(JavaApplication220.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
您可以开始使用类似...
BlockingQueue bq = new BlockingQueue(10);
Thread p = new Thread(new Producer(bq));
Thread c = new Thread(new Consumer(bq));
c.setDaemon(true);
c.start();
p.start();
您应该注意到 put
消息之间有一个小的延迟,但 took
消息之间几乎没有延迟。这是正在运行的队列。 Consumer
在队列中 blocking/waiting 有东西要给它。
您可以尝试 Producer
和 Consumer
,也许可以更改它们的时间(例如,在 Consumer
中有更长的延迟,然后再拿一个项目)看看这如何可能会造成不同的影响
When I'm running it, then it is stuck after counter : 9 and keep on waiting for forever
这很可能是因为您已经超出了队列的容量,并且它的 put
方法一直处于阻塞状态,直到您从中取出一些东西(您实际上有一个死锁,队列正在等待您从中获取一些东西,但你不能,因为你被锁定在 put
)
要记住的事情:
- 要使两个或多个线程使用监视器锁,它们必须共享同一个 monitor/object 锁实例。在这种情况下,
BlockingQueue
的相同实例
notify
将唤醒一个对象,该对象正在等待监视器锁的wait
方法的同一实例。没有办法知道是哪一个。如果您有多个消费者,但不关心数据处理的顺序,这可能很有用,例如
更新了附加示例
因此,这会从 Producer
中取出 Thread.sleep
(并允许生产者生成 100 个值)并向 Consumer
添加一个 Thread.sleep
。
这样,Producer
将在 Consumer
耗尽它之前达到其容量,迫使它等待 Consumer
可以从中获取值...
public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int index = 0; index < 100; index++) {
try {
System.out.println("Put " + index);
queue.put(index);
} catch (InterruptedException ex) {
}
}
}
}
public class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer value = queue.take();
System.out.println("Took " + value);
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
}
}
}
在此处添加 printlns
public synchronized void put(T element) throws InterruptedException {
while (queue.size() == capacity) {
System.out.println("blocked");
wait();
}
queue.add(element);
notify(); // notifyAll() for multiple producer/consumer threads
System.out.println("put "+ element);
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
T item = queue.remove();
notify(); // notifyAll() for multiple producer/consumer threads
System.out.println("removed " + item);
return item;
}
和运行本次测试
public static void main(String argv[]) throws Exception {
final BlockingQueue q = new BlockingQueue(2);
new Thread() {
public void run() {
try {
Thread.sleep(5000);
q.take();
} catch (Exception e) {
e.printStackTrace();
}
};
}.start();
q.put(1);
q.put(2); // will block here until tread 2 takes an element and reduces the capacity
q.put(3);
}
它将打印
put 1
put 2
blocked
removed 1
put 3