Consumer/Producer ExecutorService 卡住了
Consumer/Producer with ExecutorService is stuck
我是多线程新手,我正在尝试编写以下简单程序:
- 读取一个文件
- 将输出打印到屏幕
我创建了以下 类:消费者
public class Consumer implements Runnable {
private BlockingQueue<String> m_Queue;
public Consumer(BlockingQueue<String> i_Queue)
{
m_Queue = i_Queue;
}
@Override
public void run()
{
try
{
String referenceID1;
//Consuming message until exit message is received.
while((referenceID1 = m_Queue.take()) !="EOF")
{
System.out.println(referenceID1);
}
}
catch (Exception e)
{
e.printStackTrace();
}
}}
制作人:
public class Producer implements Runnable {
private BlockingQueue<String> m_Queue;
private String m_FilePath;
public Producer(BlockingQueue<String> i_Queue, String i_FilePath)
{
m_Queue = i_Queue;
m_FilePath = i_FilePath;
}
@Override
public void run()
{
try (BufferedReader reader = new BufferedReader(new FileReader(m_FilePath)))
{
String line;
while ((line = reader.readLine()) != null)
{
m_Queue.put(line);
System.out.println(line + " Was added to queue.");
}
//Adding an exit message.
m_Queue.put("EOF");
System.out.println("EOF Was added to queue.");
}
catch (IOException | InterruptedException e)
{
e.printStackTrace();
}
}}
生产者消费者服务
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
//Start the producer; Will read the file
threadPool.execute(new Producer(queue, args[0]));
for (int i = 0; i < 4; i++)
{
System.out.println("Generating consumer " + i+1);
threadPool.execute(new Consumer(queue));
}
try
{
threadPool.shutdown();
System.out.println("Shutting down threads.");
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
System.out.println("Terminated successfully.");
}
catch (InterruptedException e)
{
e.printStackTrace();
} }
输入文件由数字 1-20 组成,每个数字占一行。
当我 运行 程序时,我可以看到所有数字都已读取,但程序似乎 hang/stuck 而我没有看到 "Terminated successfully" 消息。
如果我使用单个线程读取屏幕并使用单个线程在屏幕上打印,则不会发生这种情况,但使用 1 个线程违背了我对 "multi-threaded" 程序的需求。
我的猜测是我忘记释放资源,但我不知道为什么。
您的问题是您在 Consumer
中使用了 take()
,其中:
Retrieves and removes the head of this queue, waiting if necessary
until an element becomes available.
然后你测试返回值是否为 EOF
(不正确顺便说一句,你需要使用 equals)但是你只将它放入你的队列一次并且你有 4 Consumer
所以 3 Consumer
还是等吧。
所以你应该使用 poll()
代替如下:
while((referenceID1 = m_Queue.poll()) != null && !Objects.equals(referenceID1, "EOF"))
或者如果你去掉 EOF
while((referenceID1 = m_Queue.poll()) != null)
我是多线程新手,我正在尝试编写以下简单程序:
- 读取一个文件
- 将输出打印到屏幕
我创建了以下 类:消费者
public class Consumer implements Runnable {
private BlockingQueue<String> m_Queue;
public Consumer(BlockingQueue<String> i_Queue)
{
m_Queue = i_Queue;
}
@Override
public void run()
{
try
{
String referenceID1;
//Consuming message until exit message is received.
while((referenceID1 = m_Queue.take()) !="EOF")
{
System.out.println(referenceID1);
}
}
catch (Exception e)
{
e.printStackTrace();
}
}}
制作人:
public class Producer implements Runnable {
private BlockingQueue<String> m_Queue;
private String m_FilePath;
public Producer(BlockingQueue<String> i_Queue, String i_FilePath)
{
m_Queue = i_Queue;
m_FilePath = i_FilePath;
}
@Override
public void run()
{
try (BufferedReader reader = new BufferedReader(new FileReader(m_FilePath)))
{
String line;
while ((line = reader.readLine()) != null)
{
m_Queue.put(line);
System.out.println(line + " Was added to queue.");
}
//Adding an exit message.
m_Queue.put("EOF");
System.out.println("EOF Was added to queue.");
}
catch (IOException | InterruptedException e)
{
e.printStackTrace();
}
}}
生产者消费者服务
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
//Start the producer; Will read the file
threadPool.execute(new Producer(queue, args[0]));
for (int i = 0; i < 4; i++)
{
System.out.println("Generating consumer " + i+1);
threadPool.execute(new Consumer(queue));
}
try
{
threadPool.shutdown();
System.out.println("Shutting down threads.");
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
System.out.println("Terminated successfully.");
}
catch (InterruptedException e)
{
e.printStackTrace();
} }
输入文件由数字 1-20 组成,每个数字占一行。
当我 运行 程序时,我可以看到所有数字都已读取,但程序似乎 hang/stuck 而我没有看到 "Terminated successfully" 消息。
如果我使用单个线程读取屏幕并使用单个线程在屏幕上打印,则不会发生这种情况,但使用 1 个线程违背了我对 "multi-threaded" 程序的需求。
我的猜测是我忘记释放资源,但我不知道为什么。
您的问题是您在 Consumer
中使用了 take()
,其中:
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
然后你测试返回值是否为 EOF
(不正确顺便说一句,你需要使用 equals)但是你只将它放入你的队列一次并且你有 4 Consumer
所以 3 Consumer
还是等吧。
所以你应该使用 poll()
代替如下:
while((referenceID1 = m_Queue.poll()) != null && !Objects.equals(referenceID1, "EOF"))
或者如果你去掉 EOF
while((referenceID1 = m_Queue.poll()) != null)