消费者在我 Java 中的简单 producer/consumer/queue 代码中不起作用
Consumer doesn't work in my simple producer/consumer/queue code in Java
我正在尝试在Java11中实现一个简单的producer/consumer系统。基本上,我每个都取两个线程,加上一个全局队列,简单如下:
- 全局优先级队列。
- 第一个线程,生产者,运行 HTTP 服务器,侦听传入的 HTTP 消息,并在收到消息后,
pushes
将其作为作业添加到队列(queue.size
递增)
- 第二个线程,消费者,连续
peeks
队列。如果有作业(job ! = null
),在某处提交 HTTP 请求,成功接收后,从队列中轮询(queue.size()
递减)。
骨架如下:
主要Class:
public class Manager
{
private Consumer consumer;
private Producer producer;
Queue queue;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
制作人class:
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
//HTTP server starts, listens, and adds to the queue upon receiving a Job
server.start();
Manager.queue.add(new Job());
}
}
消费者class:
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
// Thread.sleep(1);
while(true)
{
//get an object off the queue
Job job= Manager.queue.peek();
//do some stuff with the object
}
}
}
Producer
和 queue
工作正常。但问题在于 Consumer
。上面的 Consumer 代码(使用 while(true)
循环)不会查看项目。但是当我在 while(true)
循环之前添加一个 Thread.sleep(x)
时,即使 x=1 ms
也有效,并成功抓取项目。
问题是什么?理论上,while(true)
循环应该没有问题!为什么它看不到 peek
项目?!
问题原因:队列读写不同步
这里发生的是两个线程,运行 在不同的 CPU 核心上使用它们自己的队列副本,因此生产者可能正在添加内容,这些更改甚至可能得到传播进入 RAM,但消费者从不检查 RAM 中的任何内容,因为它有自己的队列缓存副本, 保持为空。
Thread.sleep()
的东西起作用了,因为当醒来时,线程必须从 RAM 中获取所有 ist 东西,它可能已经改变了。
正确的做法是只访问队列,当它同步时如下:
制作人:
synchronized(Manager.queue) {
Manager.queue.add(new Job());
}
在消费者中:
boolean continue = true;
while (continue) {
synchronized(Manager.queue) {
Job job=Manager.queue.pop();
}
}
最后一点:while (true)
的效率非常低,您可以使用 Object.wait()
和 Object.notify()
做一些事情
制作人:
synchronized(Manager.queue) {
Manager.queue.add(new Job());
Manager.queue.notify();
}
在消费者中:
boolean continue = true;
while (continue) {
synchronized(Manager.queue) {
while (Manager.queue.peek() == null) {
Manager.queue.wait();
}
Job job=Manager.queue.pop();
}
}
PriorityQueue
是 不是 线程安全的,而 PriorityBlockingQueue
是 。只要您不使用 BlockingQueue
接口中定义的任何方法,这两个实现就可以互换。只需将 PriorityQueue
更改为 PriorityBlockingQueue
即可解决您的问题。
我正在尝试在Java11中实现一个简单的producer/consumer系统。基本上,我每个都取两个线程,加上一个全局队列,简单如下:
- 全局优先级队列。
- 第一个线程,生产者,运行 HTTP 服务器,侦听传入的 HTTP 消息,并在收到消息后,
pushes
将其作为作业添加到队列(queue.size
递增) - 第二个线程,消费者,连续
peeks
队列。如果有作业(job ! = null
),在某处提交 HTTP 请求,成功接收后,从队列中轮询(queue.size()
递减)。
骨架如下:
主要Class:
public class Manager
{
private Consumer consumer;
private Producer producer;
Queue queue;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
制作人class:
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
//HTTP server starts, listens, and adds to the queue upon receiving a Job
server.start();
Manager.queue.add(new Job());
}
}
消费者class:
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
// Thread.sleep(1);
while(true)
{
//get an object off the queue
Job job= Manager.queue.peek();
//do some stuff with the object
}
}
}
Producer
和 queue
工作正常。但问题在于 Consumer
。上面的 Consumer 代码(使用 while(true)
循环)不会查看项目。但是当我在 while(true)
循环之前添加一个 Thread.sleep(x)
时,即使 x=1 ms
也有效,并成功抓取项目。
问题是什么?理论上,while(true)
循环应该没有问题!为什么它看不到 peek
项目?!
问题原因:队列读写不同步
这里发生的是两个线程,运行 在不同的 CPU 核心上使用它们自己的队列副本,因此生产者可能正在添加内容,这些更改甚至可能得到传播进入 RAM,但消费者从不检查 RAM 中的任何内容,因为它有自己的队列缓存副本, 保持为空。
Thread.sleep()
的东西起作用了,因为当醒来时,线程必须从 RAM 中获取所有 ist 东西,它可能已经改变了。
正确的做法是只访问队列,当它同步时如下:
制作人:
synchronized(Manager.queue) {
Manager.queue.add(new Job());
}
在消费者中:
boolean continue = true;
while (continue) {
synchronized(Manager.queue) {
Job job=Manager.queue.pop();
}
}
最后一点:while (true)
的效率非常低,您可以使用 Object.wait()
和 Object.notify()
制作人:
synchronized(Manager.queue) {
Manager.queue.add(new Job());
Manager.queue.notify();
}
在消费者中:
boolean continue = true;
while (continue) {
synchronized(Manager.queue) {
while (Manager.queue.peek() == null) {
Manager.queue.wait();
}
Job job=Manager.queue.pop();
}
}
PriorityQueue
是 不是 线程安全的,而 PriorityBlockingQueue
是 。只要您不使用 BlockingQueue
接口中定义的任何方法,这两个实现就可以互换。只需将 PriorityQueue
更改为 PriorityBlockingQueue
即可解决您的问题。