阻塞队列为空时杀死消费者
Kill consumers when blockingqueue is empty
我正在阅读有关阻塞队列、executoreserivce 和生产者-消费者范例的内容。
我想要不断变化的生产者数量和不断变化的消费者数量。每个生产者将追加到队列中,消费者将消费消息并处理它们。
我的问题是——生产者如何知道消费者已经完成,不再有消息进入队列?
我想在我的主线程中添加一个计数器。当生产者启动时,我将增加计数器,当每个生产者结束时,他们将减少 int。
我的消费者将能够知道计数器,当它达到 0 并且队列中没有更多元素时,他们就会死亡。
关于同步工作的另一个常见问题 - 主线程应该读取队列的内容,并为每条消息添加执行器,还是最好让线程了解此逻辑并自行决定什么时候死?
当系统启动时,我收到一个数字,它决定了将启动多少生产者。每个生产者将生成一组随机数字到队列中。消费者会将这些数字打印到日志中。我遇到的问题是,一旦我知道最后一个生产者推入了最后一个数字,我仍然不明白如何让消费者知道不会有更多的数字进来,他们应该关闭。
消费者如何知道生产者何时完成?
生产者完成后,最后一个可以打断所有消费者和(可能)生产者。
InterruptedException
每当阻塞调用(put()
或 take()
)被另一个线程通过 thread.interrupt()
中断时抛出,其中 thread
是调用方法的线程。当最后一个生产者完成时,它可以中断所有其他线程,这将导致所有阻塞方法抛出 InterruptedException
,允许您终止相应的线程。
final BlockingQueue<T> queue = ...;
final List<Thread> threads = new ArrayList<>();
threads.add(new Producer1());
threads.add(new Producer2());
threads.add(new Consumer1());
threads.add(new Consumer2());
threads.forEach(Thread::start);
// Done by the last producer, or any other thread
threads.forEach(Thread::interrupt);
class Producer extends Thread {
@Override
public void run() {
for (int i = 0; i < X; i++) {
T element;
// Produce element
try {
queue.put(element);
} catch (InterruptedException e) {
break; // Optional, only if other producers may still be running and
// you want to stop them, or interruption is performed by
// a completely different thread
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
while (true) {
T element;
try {
element = queue.take();
} catch (InterruptedException e) {
break;
}
// Consume element
}
}
}
这个问题的一个优雅的解决方案是使用 PoisonPill 模式。这是它如何工作的一个例子。在这种情况下,您只需要知道生产者的数量。
编辑:我更新了代码以在最后一个消费者完成工作时清除队列。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class PoisonPillsTests {
interface Message {
}
interface PoisonPill extends Message {
PoisonPill INSTANCE = new PoisonPill() {
};
}
static class TextMessage implements Message {
private final String text;
public TextMessage(String text) {
this.text = text;
}
public String getText() {
return text;
}
@Override
public String toString() {
return text;
}
}
static class Producer implements Runnable {
private final String producerName;
private final AtomicInteger producersCount;
private final BlockingQueue<Message> messageBlockingQueue;
public Producer(String producerName, BlockingQueue<Message> messageBlockingQueue, AtomicInteger producersCount) {
this.producerName = producerName;
this.messageBlockingQueue = messageBlockingQueue;
this.producersCount = producersCount;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
messageBlockingQueue.put(new TextMessage("Producer " + producerName + " message " + i));
}
if (producersCount.decrementAndGet() <= 0) {
//we need this producersCount so that the producers to produce a single poison pill
messageBlockingQueue.put(PoisonPill.INSTANCE);
}
} catch (InterruptedException e) {
throw new RuntimeException("Producer interrupted", e);
}
}
}
static class Consumer implements Runnable {
private final AtomicInteger consumersCount;
private final AtomicInteger consumedMessages;
private final BlockingQueue<Message> messageBlockingQueue;
public Consumer(BlockingQueue<Message> messageBlockingQueue, AtomicInteger consumersCount, AtomicInteger consumedMessages) {
this.messageBlockingQueue = messageBlockingQueue;
this.consumersCount = consumersCount;
this.consumedMessages = consumedMessages;
}
@Override
public void run() {
try {
while (true) {
Message message = null;
message = messageBlockingQueue.take();
if (message instanceof PoisonPill) {
//we put back the poison pill so that to be consumed by the next consumer
messageBlockingQueue.put(message);
break;
} else {
consumedMessages.incrementAndGet();
System.out.println("Consumer got message " + message);
}
}
} catch (InterruptedException e) {
throw new RuntimeException("Consumer interrupted", e);
} finally {
if (consumersCount.decrementAndGet() <= 0) {
System.out.println("Last consumer, clearing the queue");
messageBlockingQueue.clear();
}
}
}
}
public static void main(String[] args) {
final AtomicInteger producerCount = new AtomicInteger(4);
final AtomicInteger consumersCount = new AtomicInteger(2);
final AtomicInteger consumedMessages = new AtomicInteger();
BlockingQueue<Message> messageBlockingQueue = new LinkedBlockingQueue<>();
List<CompletableFuture<Void>> tasks = new ArrayList<>();
for (int i = 0; i < producerCount.get(); i++) {
tasks.add(CompletableFuture.runAsync(new Producer("" + (i + 1), messageBlockingQueue, producerCount)));
}
for (int i = 0; i < consumersCount.get(); i++) {
tasks.add(CompletableFuture.runAsync(new Consumer(messageBlockingQueue, consumersCount, consumedMessages)));
}
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
System.out.println("Consumed " + consumedMessages + " messages");
}
}
我正在阅读有关阻塞队列、executoreserivce 和生产者-消费者范例的内容。 我想要不断变化的生产者数量和不断变化的消费者数量。每个生产者将追加到队列中,消费者将消费消息并处理它们。 我的问题是——生产者如何知道消费者已经完成,不再有消息进入队列? 我想在我的主线程中添加一个计数器。当生产者启动时,我将增加计数器,当每个生产者结束时,他们将减少 int。 我的消费者将能够知道计数器,当它达到 0 并且队列中没有更多元素时,他们就会死亡。
关于同步工作的另一个常见问题 - 主线程应该读取队列的内容,并为每条消息添加执行器,还是最好让线程了解此逻辑并自行决定什么时候死?
当系统启动时,我收到一个数字,它决定了将启动多少生产者。每个生产者将生成一组随机数字到队列中。消费者会将这些数字打印到日志中。我遇到的问题是,一旦我知道最后一个生产者推入了最后一个数字,我仍然不明白如何让消费者知道不会有更多的数字进来,他们应该关闭。
消费者如何知道生产者何时完成?
生产者完成后,最后一个可以打断所有消费者和(可能)生产者。
InterruptedException
每当阻塞调用(put()
或 take()
)被另一个线程通过 thread.interrupt()
中断时抛出,其中 thread
是调用方法的线程。当最后一个生产者完成时,它可以中断所有其他线程,这将导致所有阻塞方法抛出 InterruptedException
,允许您终止相应的线程。
final BlockingQueue<T> queue = ...;
final List<Thread> threads = new ArrayList<>();
threads.add(new Producer1());
threads.add(new Producer2());
threads.add(new Consumer1());
threads.add(new Consumer2());
threads.forEach(Thread::start);
// Done by the last producer, or any other thread
threads.forEach(Thread::interrupt);
class Producer extends Thread {
@Override
public void run() {
for (int i = 0; i < X; i++) {
T element;
// Produce element
try {
queue.put(element);
} catch (InterruptedException e) {
break; // Optional, only if other producers may still be running and
// you want to stop them, or interruption is performed by
// a completely different thread
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
while (true) {
T element;
try {
element = queue.take();
} catch (InterruptedException e) {
break;
}
// Consume element
}
}
}
这个问题的一个优雅的解决方案是使用 PoisonPill 模式。这是它如何工作的一个例子。在这种情况下,您只需要知道生产者的数量。
编辑:我更新了代码以在最后一个消费者完成工作时清除队列。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class PoisonPillsTests {
interface Message {
}
interface PoisonPill extends Message {
PoisonPill INSTANCE = new PoisonPill() {
};
}
static class TextMessage implements Message {
private final String text;
public TextMessage(String text) {
this.text = text;
}
public String getText() {
return text;
}
@Override
public String toString() {
return text;
}
}
static class Producer implements Runnable {
private final String producerName;
private final AtomicInteger producersCount;
private final BlockingQueue<Message> messageBlockingQueue;
public Producer(String producerName, BlockingQueue<Message> messageBlockingQueue, AtomicInteger producersCount) {
this.producerName = producerName;
this.messageBlockingQueue = messageBlockingQueue;
this.producersCount = producersCount;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
messageBlockingQueue.put(new TextMessage("Producer " + producerName + " message " + i));
}
if (producersCount.decrementAndGet() <= 0) {
//we need this producersCount so that the producers to produce a single poison pill
messageBlockingQueue.put(PoisonPill.INSTANCE);
}
} catch (InterruptedException e) {
throw new RuntimeException("Producer interrupted", e);
}
}
}
static class Consumer implements Runnable {
private final AtomicInteger consumersCount;
private final AtomicInteger consumedMessages;
private final BlockingQueue<Message> messageBlockingQueue;
public Consumer(BlockingQueue<Message> messageBlockingQueue, AtomicInteger consumersCount, AtomicInteger consumedMessages) {
this.messageBlockingQueue = messageBlockingQueue;
this.consumersCount = consumersCount;
this.consumedMessages = consumedMessages;
}
@Override
public void run() {
try {
while (true) {
Message message = null;
message = messageBlockingQueue.take();
if (message instanceof PoisonPill) {
//we put back the poison pill so that to be consumed by the next consumer
messageBlockingQueue.put(message);
break;
} else {
consumedMessages.incrementAndGet();
System.out.println("Consumer got message " + message);
}
}
} catch (InterruptedException e) {
throw new RuntimeException("Consumer interrupted", e);
} finally {
if (consumersCount.decrementAndGet() <= 0) {
System.out.println("Last consumer, clearing the queue");
messageBlockingQueue.clear();
}
}
}
}
public static void main(String[] args) {
final AtomicInteger producerCount = new AtomicInteger(4);
final AtomicInteger consumersCount = new AtomicInteger(2);
final AtomicInteger consumedMessages = new AtomicInteger();
BlockingQueue<Message> messageBlockingQueue = new LinkedBlockingQueue<>();
List<CompletableFuture<Void>> tasks = new ArrayList<>();
for (int i = 0; i < producerCount.get(); i++) {
tasks.add(CompletableFuture.runAsync(new Producer("" + (i + 1), messageBlockingQueue, producerCount)));
}
for (int i = 0; i < consumersCount.get(); i++) {
tasks.add(CompletableFuture.runAsync(new Consumer(messageBlockingQueue, consumersCount, consumedMessages)));
}
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
System.out.println("Consumed " + consumedMessages + " messages");
}
}