使用 java 同步了解生产者-消费者

Understanding Producer-Consumer using java synchronization

我一直在研究PC问题,以了解Java同步和线程间通信。使用底部的代码,输出为

Producer produced-0
Producer produced-1
Producer produced-2
Consumer consumed-0
Consumer consumed-1
Consumer consumed-2
Producer produced-3
Producer produced-4
Producer produced-5
Consumer consumed-3
Consumer consumed-4

但是输出不应该像下面这样

Producer produced-0
Consumer consumed-0
Producer produced-1
Consumer consumed-1
Producer produced-2
Consumer consumed-2
Producer produced-3

我期待这样的输出,因为我的理解是,当 produce 方法终止时,一旦 produce 方法释放锁,消费者就会收到所产生的值的通知。结果,正在等待的消费者块进入同步状态获取锁以消费产生的值,同时生产者方法被阻塞。此锁在 consume 方法结束时释放,该方法由生产者线程获取,该线程由于同步而被阻塞,并且循环继续,因为每个方法由于获取的锁而被阻塞。

请让我知道我误解了什么?谢谢

package MultiThreading;

//Java program to implement solution of producer
//consumer problem.
import java.util.LinkedList;

public class PCExample2
{
 public static void main(String[] args)
                     throws InterruptedException
 {
     // Object of a class that has both produce()
     // and consume() methods
     final PC pc = new PC();

     // Create producer thread
     Thread t1 = new Thread(new Runnable()
     {
         @Override
         public void run()
         {
             try
             {
                 while (true) {
                     pc.produce();   
                 }                 
             }
             catch(InterruptedException e)
             {
                 e.printStackTrace();
             }
         }
     });

     // Create consumer thread
     Thread t2 = new Thread(new Runnable()
     {
         @Override
         public void run()
         {
             try
             {
                 while (true) {
                     pc.consume();   
                 }
             }
             catch(InterruptedException e)
             {
                 e.printStackTrace();
             }
         }
     });

     // Start both threads
     t1.start();
     t2.start();

     // t1 finishes before t2
     t1.join();
     t2.join();
 }

 // This class has a list, producer (adds items to list
 // and consumber (removes items).
 public static class PC
 {
     // Create a list shared by producer and consumer
     // Size of list is 2.
     LinkedList<Integer> list = new LinkedList<>();
     int capacity = 12;
     int value = 0;

     // Function called by producer thread
     public void produce() throws InterruptedException
     {         
         synchronized (this)
         {
             // producer thread waits while list
             // is full
             while (list.size()==capacity)
                 wait();

             System.out.println("Producer produced-"
                                           + value);

             // to insert the jobs in the list
             list.add(value++);

             // notifies the consumer thread that
             // now it can start consuming
             notify();

             // makes the working of program easier
             // to  understand
             Thread.sleep(1000);
         }
     }

     // Function called by consumer thread
     public void consume() throws InterruptedException
     {
         synchronized (this)
         {
             // consumer thread waits while list
             // is empty
             while (list.size()==0)
                 wait();

             //to retrive the ifrst job in the list
             int val = list.removeFirst();


             System.out.println("Consumer consumed-"
                                             + val);

             // Wake up producer thread
             notify();

             // and sleep
             Thread.sleep(1000);
         }
     }
 }
}

注意两个方法:notify && Thread.sleep

Object.notify():

Wakes up a single thread that is waiting on this object's monitor. If any threads are waiting on this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the implementation. A thread waits on an object's monitor by calling one of the wait methods.
The awakened thread will not be able to proceed until the current thread relinquishes the lock on this object. The awakened thread will compete in the usual manner with any other threads that might be actively competing to synchronize on this object; for example, the awakened thread enjoys no reliable privilege or disadvantage in being the next thread to lock this object.

Thread.sleep():

Causes the currently executing thread to sleep (temporarily cease execution) for the specified number of milliseconds plus the specified number of nanoseconds, subject to the precision and accuracy of system timers and schedulers. The thread does not lose ownership of any monitors.

好的。现在你知道 notify 只会唤醒一个线程,它也监视这个对象,但被唤醒的线程将竞争同步这个对象。如果你的生产者通知消费者并释放了锁,那么生产者和消费者就站在同一个点上竞争了。而且 Thread.sleep 不做你想要的工作,它不会像文档所说的那样在睡眠时释放锁。所以这可能会发生。

总而言之,Thread.sleep 同步效果不是很好。即使您删除了它,由于通知机制,第一个输出也会发生。

@Andrew S 的回答有效。

来自API:被唤醒的线程将以通常的方式与可能正在积极竞争同步此对象的任何其他线程进行竞争;例如,被唤醒的线程在成为下一个锁定该对象的线程时不享有可靠的特权或劣势。

sleep() 移到同步块之外,让其他线程有机会获得锁。

第一个调用当前获取的锁的线程(我们称之为线程 A)不一定会在锁的当前所有者线程放弃锁时立即获取锁,如果其他线程自从线程 A 试图获取它以来,也已经调用了锁。没有有序"queue"。参见 here and here。因此,从程序的输出来看,似乎在生产者释放锁之后,在生产者线程中重复 while 循环之前,消费者可能没有足够的时间来获取锁,并且生产者线程再次调用锁(其他答案已经指出, Thread.sleep() 不会导致休眠线程放弃锁),如果消费者不幸,生产者将重新获得锁,即使消费者先到那里。

不过,好像还有一个误会。在列表包含 12 个元素之前,生产者线程永远不会在 PC 上 "wait",因此只有在生产者至少生产了 12 个元素(顺便说一下,当我 运行 程序时会发生什么——消费者永远不会有机会,直到生产者线程在 PC 上调用 wait(),但随后,它会消耗整个列表)。这也意味着,如果恰好轮到消费者并且列表包含的元素少于12个,生产者线程将不会被通知,因为它不是waiting被通知,而只是blocked 并且已经,让我们说 "anticipating" 或 "expecting" PC 上的锁(另见 here 关于 "waiting" 和 "waiting" 之间的区别"blocked")。所以即使你把两个 Thread.sleep() 调用放在同步块之外,从而给消费者线程(希望你不应该依赖这个)足够的时间来获取锁,调用 notify() 来自消费者线程不会有任何影响,因为生产者线程永远不会处于等待状态。

要真正确保两个线程交替修改 PC,您必须让生产者线程仅在列表大小大于零时等待,而不是列表包含 12(或者但是许多)元素。

只需添加适当的条件即可。

import java.util.LinkedList;
import java.util.Queue;

class Producer extends Thread {

    public Queue<Integer> producerQueue;
    public int size;
    public int count = 0;

    Producer(Queue<Integer> queue, int size) {
        producerQueue = queue;
        this.size = size;
    }

    public void produce() throws InterruptedException {
        synchronized (producerQueue) {
            while (producerQueue.size() > 0) {
                producerQueue.wait();
            }
            System.out.println("Produced : " + count);
            producerQueue.add(count++);
            producerQueue.notify();
            Thread.sleep(100);
        }
    }

    public void run() {
        try {
            while (true) produce();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


class Consumer extends Thread {
    public Queue<Integer> consumerQueue;
    public int size;

    Consumer(Queue<Integer> queue, int size) {
        consumerQueue = queue;
        this.size = size;
    }

    public void consume() throws InterruptedException {
        synchronized (consumerQueue) {
            while (consumerQueue.size() == 0) {
                consumerQueue.wait();
                Thread.sleep(100);
            }
            System.out.println("Consumed : " + consumerQueue.poll());
            consumerQueue.notify();
        }
    }

    public void run() {
        try {
            while (true) consume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}


public class Test {

    public static void main(String[] args) {
        Queue<Integer> commonQueue = new LinkedList<>();
        int size = 10;
        new Producer(commonQueue, size).start();
        new Consumer(commonQueue, size).start();
    }
}