如何使用信号量在生产者消费者中消费?

How to consume in Producer-Consumer using Semphores?

我正在尝试使用信号量解决生产者-消费者问题。除了一个地方,我觉得这个程序还不错。

public class ProducerConsumerWithSemaphores
{
    private final ArrayList<Integer> list = new ArrayList<>(5);
    private final Semaphore semaphoreProducer = new Semaphore(1);
    private final Semaphore semaphoreConsumer = new Semaphore(0);

    private void produce() throws InterruptedException
    {
        for(int i = 0;i< 5;i++)
        {
            semaphoreProducer.acquire();
            list.add(i);
            System.out.println("Produced: " + i);
            semaphoreConsumer.release();
        }
    }

    private void consumer() throws InterruptedException
    {
        while (!list.isEmpty())    /// This line is where I have the doubt
        {
            semaphoreConsumer.acquire();
            System.out.println("Consumer: " + list.remove(list.size()-1));
            semaphoreProducer.release();
            Thread.sleep(100);
        }
    }

    public static void main(String[] args)
    {
        final ProducerConsumerWithSemaphores obj = new ProducerConsumerWithSemaphores();

        new Thread(new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    obj.produce();
                } catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    obj.consumer();
                } catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

在获取信号量之前检查列表是否为空是否可以?在多线程环境下会不会有问题?

private void consumer() throws InterruptedException
{
    while (!list.isEmpty())    /// This line is where I have the doubt

问题是,如果consumer跑得比producer快,你的consumer立马退出,那你就没有consumer了!!

正确的例子看起来像, Producer–consumer problem#Using semaphores. I believe your intention is not to use true as endless loop because you want Producer/Consumer to quit when job is done. If that's your intention, you can 1. set a totalCount to end the loop. 2. Or a boolean flag which will be set by producer after putItemIntoBuffer when producer put the last one. The flag must be protected as well as the buffer.(update: this method doesn't work if there's multiple producers/consumers) 3. Simulate EOF ( idea taken from producer - consume; how does the consumer stop?)

Will this cause any problem in multithreaded environment?

您的关键部分(您的 list)未受保护。通常我们使用 3 个信号量。第三个用作互斥锁来保护缓冲区。

停止producers/consumers,
方法 1 的示例代码:

public class Test3 {

  private Semaphore mutex = new Semaphore(1);
  private Semaphore fillCount = new Semaphore(0);
  private Semaphore emptyCount = new Semaphore(3);

  private final List<Integer> list = new ArrayList<>();

  class Producer implements Runnable {

    private final int totalTasks;

    Producer(int totalTasks) {
      this.totalTasks = totalTasks;
    }

    @Override
    public void run() {
      try {
        for (int i = 0; i < totalTasks; i++) {
          emptyCount.acquire();
          mutex.acquire();
          list.add(i);
          System.out.println("Produced: " + i);
          mutex.release();
          fillCount.release();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  class Consumer implements Runnable {
    private final int totalTasks;

    Consumer(int totalTasks) {
      this.totalTasks = totalTasks;
    }

    @Override
    public void run() {
      try {
        for (int i = 0; i < totalTasks; i++) {
          fillCount.acquire();
          mutex.acquire();
          int item = list.remove(list.size() - 1);
          System.out.println("Consumed: " + item);
          mutex.release();
          emptyCount.release();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  public void runTest() {
    int numProducer = 3;
    int tasksPerProducer = 10;
    int numConsumer = 6;
    int tasksPerConsumer = 5;

    for (int i = 0; i < numProducer; i++) {
      new Thread(new Producer(tasksPerProducer)).start();
    }
    for (int i = 0; i < numConsumer; i++) {
      new Thread(new Consumer(tasksPerConsumer)).start();
    }
  }

  public static void main(String[] args) throws IOException {
    Test3 t = new Test3();
    t.runTest();
  }
}

使用方法 3 的示例代码:

public class Test4 {

  private Semaphore mutex = new Semaphore(1);
  private Semaphore fillCount = new Semaphore(0);
  private Semaphore emptyCount = new Semaphore(3);

  private Integer EOF = Integer.MAX_VALUE;

  private final Queue<Integer> list = new LinkedList<>(); // need to put/get data in FIFO

  class Producer implements Runnable {

    private final int totalTasks;

    Producer(int totalTasks) {
      this.totalTasks = totalTasks;
    }

    @Override
    public void run() {
      try {
        for (int i = 0; i < totalTasks + 1; i++) {
          emptyCount.acquire();
          mutex.acquire();
          if (i == totalTasks) {
            list.offer(EOF);
          } else {
            // add a valid value
            list.offer(i);
            System.out.println("Produced: " + i);
          }
          mutex.release();
          fillCount.release();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  class Consumer implements Runnable {

    @Override
    public void run() {
      try {
        boolean finished = false;
        while (!finished) {
          fillCount.acquire();
          mutex.acquire();
          int item = list.poll();
          if (EOF.equals(item)) {
            // do not consume this item because it means EOF
            finished = true;
          } else {
            // it's a valid value, consume it.
            System.out.println("Consumed: " + item);
          }
          mutex.release();
          emptyCount.release();
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  public void runTest() {
    int numProducer = 3;
    int tasksPerProducer = 10;

    for (int i = 0; i < numProducer; i++) {
      new Thread(new Producer(tasksPerProducer)).start();
    }

    int numConsumer = numProducer; // producers will put N EOFs to kill N consumers.
    for (int i = 0; i < numConsumer; i++) {
      new Thread(new Consumer()).start();
    }
  }

  public static void main(String[] args) throws IOException {
    Test4 t = new Test4();
    t.runTest();
  }
}

为什么不使用单个信号量而不是使用两个信号量,以便在线程之间进行同步link

此外,您可以使用线程安全的 ArrayBlockingQueue 来正确演示生产者消费者问题。