阻塞队列为空时杀死消费者

Kill consumers when blockingqueue is empty

我正在阅读有关阻塞队列、executoreserivce 和生产者-消费者范例的内容。 我想要不断变化的生产者数量和不断变化的消费者数量。每个生产者将追加到队列中,消费者将消费消息并处理它们。 我的问题是——生产者如何知道消费者已经完成,不再有消息进入队列? 我想在我的主线程中添加一个计数器。当生产者启动时,我将增加计数器,当每个生产者结束时,他们将减少 int。 我的消费者将能够知道计数器,当它达到 0 并且队列中没有更多元素时,他们就会死亡。

关于同步工作的另一个常见问题 - 主线程应该读取队列的内容,并为每条消息添加执行器,还是最好让线程了解此逻辑并自行决定什么时候死?

当系统启动时,我收到一个数字,它决定了将启动多少生产者。每个生产者将生成一组随机数字到队列中。消费者会将这些数字打印到日志中。我遇到的问题是,一旦我知道最后一个生产者推入了最后一个数字,我仍然不明白如何让消费者知道不会有更多的数字进来,他们应该关闭。

消费者如何知道生产者何时完成?

生产者完成后,最后一个可以打断所有消费者和(可能)生产者。

InterruptedException 每当阻塞调用(put()take())被另一个线程通过 thread.interrupt() 中断时抛出,其中 thread 是调用方法的线程。当最后一个生产者完成时,它可以中断所有其他线程,这将导致所有阻塞方法抛出 InterruptedException,允许您终止相应的线程。

final BlockingQueue<T> queue = ...;
final List<Thread> threads = new ArrayList<>();

threads.add(new Producer1());
threads.add(new Producer2());
threads.add(new Consumer1());
threads.add(new Consumer2());
threads.forEach(Thread::start);

// Done by the last producer, or any other thread
threads.forEach(Thread::interrupt);

class Producer extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < X; i++) {
            T element;
            // Produce element
            try {
                queue.put(element);
            } catch (InterruptedException e) {
                break; // Optional, only if other producers may still be running and
                       // you want to stop them, or interruption is performed by
                       // a completely different thread
            }
        }
    }
}

class Consumer extends Thread {
    @Override
    public void run() {
        while (true) {
            T element;
            try {
                element = queue.take();
            } catch (InterruptedException e) {
                break;
            }
            // Consume element
        }
    }
}

这个问题的一个优雅的解决方案是使用 PoisonPill 模式。这是它如何工作的一个例子。在这种情况下,您只需要知道生产者的数量。

编辑:我更新了代码以在最后一个消费者完成工作时清除队列。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class PoisonPillsTests {

    interface Message {

    }

    interface PoisonPill extends Message {
        PoisonPill INSTANCE = new PoisonPill() {
        };
    }

    static class TextMessage implements Message {

        private final String text;

        public TextMessage(String text) {
            this.text = text;
        }

        public String getText() {
            return text;
        }

        @Override
        public String toString() {
            return text;
        }
    }

    static class Producer implements Runnable {

        private final String producerName;
        private final AtomicInteger producersCount;
        private final BlockingQueue<Message> messageBlockingQueue;

        public Producer(String producerName, BlockingQueue<Message> messageBlockingQueue, AtomicInteger producersCount) {
            this.producerName = producerName;
            this.messageBlockingQueue = messageBlockingQueue;
            this.producersCount = producersCount;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    messageBlockingQueue.put(new TextMessage("Producer " + producerName + " message " + i));
                }
                if (producersCount.decrementAndGet() <= 0) {
                    //we need this producersCount so that the producers to produce a single poison pill
                    messageBlockingQueue.put(PoisonPill.INSTANCE);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Producer interrupted", e);
            }
        }
    }

    static class Consumer implements Runnable {

        private final AtomicInteger consumersCount;
        private final AtomicInteger consumedMessages;
        private final BlockingQueue<Message> messageBlockingQueue;

        public Consumer(BlockingQueue<Message> messageBlockingQueue, AtomicInteger consumersCount, AtomicInteger consumedMessages) {
            this.messageBlockingQueue = messageBlockingQueue;
            this.consumersCount = consumersCount;
            this.consumedMessages = consumedMessages;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Message message = null;
                    message = messageBlockingQueue.take();

                    if (message instanceof PoisonPill) {
                        //we put back the poison pill so that to be consumed by the next consumer
                        messageBlockingQueue.put(message);
                        break;
                    } else {
                        consumedMessages.incrementAndGet();
                        System.out.println("Consumer got message " + message);
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Consumer interrupted", e);
            } finally {
                if (consumersCount.decrementAndGet() <= 0) {
                    System.out.println("Last consumer, clearing the queue");
                    messageBlockingQueue.clear();
                }
            }
        }
    }

    public static void main(String[] args) {

        final AtomicInteger producerCount = new AtomicInteger(4);
        final AtomicInteger consumersCount = new AtomicInteger(2);
        final AtomicInteger consumedMessages = new AtomicInteger();
        BlockingQueue<Message> messageBlockingQueue = new LinkedBlockingQueue<>();


        List<CompletableFuture<Void>> tasks = new ArrayList<>();
        for (int i = 0; i < producerCount.get(); i++) {
            tasks.add(CompletableFuture.runAsync(new Producer("" + (i + 1), messageBlockingQueue, producerCount)));
        }

        for (int i = 0; i < consumersCount.get(); i++) {
            tasks.add(CompletableFuture.runAsync(new Consumer(messageBlockingQueue, consumersCount, consumedMessages)));
        }

        CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();

        System.out.println("Consumed " + consumedMessages + " messages");

    }
}