单生产者多消费者Java
Single Producer Multiple Consumer Java
我是 Java 并发的新手,正在尝试 achieve/implement 单生产者 [P1] 和多消费者 [C1、C2、C3]。
想法是生产者 [P1] 放入值,消费者 C1、C2、C3 都运行他们的任务以单独读取 P1 放入的值。一旦 C1、C2、C3 读取值,P1 再次放入新数据。然后C1,C2,C3读取数据,如此循环。
等待通知对于单生产者单消费者工作正常,但在这种情况下单生产者多消费者等待通知概念看起来不是好的策略。我应该如何处理这个问题。
感谢@Ivan 和@Andreas。
@Ivan - 在他的评论中让我理解了生产者消费者模式的行为方式。
@Andreas - 在他的评论中建议使用Phaser。 (我改用循环屏障,因为我的注册线程数不会动态变化)
他们的评论共享以下示例代码。
如果有任何或更好的方法来处理这个问题,请提出即兴创作的建议。
主要Class
public static void main(String[] args)
{
SharedSpace sharedSpace = new SharedSpace(new LinkedBlockingQueue<Integer>(1));
new Thread(new Producer(sharedSpace)).start();
Consumer consumerRunnable = new Consumer(sharedSpace);
new Thread(consumerRunnable).start();
CyclicBarrier barrier = new CyclicBarrier(3,consumerRunnable);
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
}
制作人
private SharedSpace sharedSpace;
public Producer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
@Override
public void run() {
for(int i=0;i<3;i++)
{
int value = (int) (Math.random()*30);
sharedSpace.addValue(value);
}
}
生产者和消费者共享队列
private BlockingQueue<Integer> queue;
public SharedSpace(BlockingQueue<Integer> queue) {
super();
this.queue = queue;
}
public BlockingQueue<Integer> getQueue() {
return queue;
}
public void setQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void addValue(int value)
{
try {
queue.put(value);
System.out.println(System.nanoTime()+" Producer added value "+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getValue() throws InterruptedException
{
return queue.take();
}
消费者
private SharedSpace sharedSpace;
private Integer value;
public Consumer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
public Integer getValue() {
return value;
}
public void setValue(Integer value) {
this.value = value;
}
@Override
public void run()
{
try {
setValue(sharedSpace.getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
最终用户
CyclicBarrier barrier;
Consumer consumer;
public EndUser(CyclicBarrier barrier) {
super();
this.barrier = barrier;
}
public EndUser(CyclicBarrier barrier, Consumer consumer) {
super();
this.barrier = barrier;
this.consumer = consumer;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public CyclicBarrier getBarrier() {
return barrier;
}
public void setBarrier(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try
{
while(true)
{
System.out.println(consumer.getValue());
barrier.await();
}
}
catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
输出[消费者不会从生产者那里读取数据,除非所有最终用户都获取了他们的数据]
Producer added value 24
Producer added value 10
24
24
24
10
10
Producer added value 0
10
0
0
0
我是 Java 并发的新手,正在尝试 achieve/implement 单生产者 [P1] 和多消费者 [C1、C2、C3]。
想法是生产者 [P1] 放入值,消费者 C1、C2、C3 都运行他们的任务以单独读取 P1 放入的值。一旦 C1、C2、C3 读取值,P1 再次放入新数据。然后C1,C2,C3读取数据,如此循环。
等待通知对于单生产者单消费者工作正常,但在这种情况下单生产者多消费者等待通知概念看起来不是好的策略。我应该如何处理这个问题。
感谢@Ivan 和@Andreas。
@Ivan - 在他的评论中让我理解了生产者消费者模式的行为方式。 @Andreas - 在他的评论中建议使用Phaser。 (我改用循环屏障,因为我的注册线程数不会动态变化)
他们的评论共享以下示例代码。 如果有任何或更好的方法来处理这个问题,请提出即兴创作的建议。
主要Class
public static void main(String[] args)
{
SharedSpace sharedSpace = new SharedSpace(new LinkedBlockingQueue<Integer>(1));
new Thread(new Producer(sharedSpace)).start();
Consumer consumerRunnable = new Consumer(sharedSpace);
new Thread(consumerRunnable).start();
CyclicBarrier barrier = new CyclicBarrier(3,consumerRunnable);
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
}
制作人
private SharedSpace sharedSpace;
public Producer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
@Override
public void run() {
for(int i=0;i<3;i++)
{
int value = (int) (Math.random()*30);
sharedSpace.addValue(value);
}
}
生产者和消费者共享队列
private BlockingQueue<Integer> queue;
public SharedSpace(BlockingQueue<Integer> queue) {
super();
this.queue = queue;
}
public BlockingQueue<Integer> getQueue() {
return queue;
}
public void setQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void addValue(int value)
{
try {
queue.put(value);
System.out.println(System.nanoTime()+" Producer added value "+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getValue() throws InterruptedException
{
return queue.take();
}
消费者
private SharedSpace sharedSpace;
private Integer value;
public Consumer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
public Integer getValue() {
return value;
}
public void setValue(Integer value) {
this.value = value;
}
@Override
public void run()
{
try {
setValue(sharedSpace.getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
最终用户
CyclicBarrier barrier;
Consumer consumer;
public EndUser(CyclicBarrier barrier) {
super();
this.barrier = barrier;
}
public EndUser(CyclicBarrier barrier, Consumer consumer) {
super();
this.barrier = barrier;
this.consumer = consumer;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public CyclicBarrier getBarrier() {
return barrier;
}
public void setBarrier(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try
{
while(true)
{
System.out.println(consumer.getValue());
barrier.await();
}
}
catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
输出[消费者不会从生产者那里读取数据,除非所有最终用户都获取了他们的数据]
Producer added value 24
Producer added value 10
24
24
24
10
10
Producer added value 0
10
0
0
0