Consumer/Producer ExecutorService 卡住了

Consumer/Producer with ExecutorService is stuck

我是多线程新手,我正在尝试编写以下简单程序:

  1. 读取一个文件
  2. 将输出打印到屏幕

我创建了以下 类:消费者

public class Consumer implements Runnable {
    private BlockingQueue<String> m_Queue;

public Consumer(BlockingQueue<String> i_Queue)
{
    m_Queue = i_Queue;
}

@Override
public void run()
{
    try
    {
        String referenceID1;

        //Consuming message until exit message is received.
        while((referenceID1 = m_Queue.take()) !="EOF")
        {
            System.out.println(referenceID1);
        }
    }
    catch (Exception e)
    {
        e.printStackTrace();
    }
}}

制作人:

public class Producer implements Runnable {
private BlockingQueue<String> m_Queue;
private String m_FilePath;

public Producer(BlockingQueue<String> i_Queue, String i_FilePath)
{
    m_Queue = i_Queue;
    m_FilePath = i_FilePath;
}

@Override
public void run()
{
    try (BufferedReader reader = new BufferedReader(new FileReader(m_FilePath)))
    {
        String line;
        while ((line = reader.readLine()) != null)
        {
            m_Queue.put(line);
            System.out.println(line + " Was added to queue.");
        }

        //Adding an exit message.
        m_Queue.put("EOF");
        System.out.println("EOF Was added to queue.");
    }
    catch (IOException | InterruptedException e)
    {
        e.printStackTrace();
    }
}}

生产者消费者服务

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

    //Start the producer; Will read the file
    threadPool.execute(new Producer(queue, args[0]));

    for (int i = 0; i < 4; i++)
    {
        System.out.println("Generating consumer " + i+1);
        threadPool.execute(new Consumer(queue));
    }

    try
    {
        threadPool.shutdown();
        System.out.println("Shutting down threads.");

        threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        System.out.println("Terminated successfully.");
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    } }

输入文件由数字 1-20 组成,每个数字占一行。 当我 运行 程序时,我可以看到所有数字都已读取,但程序似乎 hang/stuck 而我没有看到 "Terminated successfully" 消息。
如果我使用单个线程读取屏幕并使用单个线程在屏幕上打印,则不会发生这种情况,但使用 1 个线程违背了我对 "multi-threaded" 程序的需求。
我的猜测是我忘记释放资源,但我不知道为什么。

您的问题是您在 Consumer 中使用了 take(),其中:

Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

然后你测试返回值是否为 EOF(不正确顺便说一句,你需要使用 equals)但是你只将它放入你的队列一次并且你有 4 Consumer 所以 3 Consumer 还是等吧。

所以你应该使用 poll() 代替如下:

while((referenceID1 = m_Queue.poll()) != null && !Objects.equals(referenceID1, "EOF"))

或者如果你去掉 EOF

while((referenceID1 = m_Queue.poll()) != null)