查找生产者是否完成角色生成

Find if the producer is completed the character generation

我有生成随机字符的生产者代码:

public class Producer implements Runnable {



    @Override
    public void run() {

        Stream<Character> generate = Stream.generate(this::generateRandomCharacter).limit(15);

        generate.forEach(character -> {

            MyEvent myEvent = new MyEvent();
            myEvent.setMesage(character + "");

            LOG.info("Producer: " + name + " is waiting to transfer...");

            try {
                boolean added = transferQueue.tryTransfer(myEvent, 4000, TimeUnit.MILLISECONDS);

                if (added) {
                    numberOfProducedMessages.incrementAndGet();
                    LOG.info("Producer: " + name + " transferred element: A");
                } else {
                    LOG.info("can not add an element due to the timeout");
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });
    }

}

提供消费者代码:

public class Consumer implements Runnable {

    private static final Logger LOG = Logger.getLogger(Consumer.class.getName());

    private final TransferQueue<MyEvent> transferQueue;

    private final String name;

    final int numberOfMessagesToConsume;

    final AtomicInteger numberOfConsumedMessages = new AtomicInteger();


    Consumer(TransferQueue<MyEvent> transferQueue, String name, int numberOfMessagesToConsume) {

        this.transferQueue = transferQueue;
        this.name = name;
        this.numberOfMessagesToConsume = numberOfMessagesToConsume;
    }

    @Override
    public void run() {

        

            while (true){

                try {
                    LOG.info("Consumer: " + name + " is waiting to take element...");

                    MyEvent element = transferQueue.take();
                    longProcessing(element);

                    System.out.println("Consumer: " + name + " received element with messgae : " + element.getMesage());

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        

    }

    private void longProcessing(MyEvent element) throws InterruptedException {

        numberOfConsumedMessages.incrementAndGet();
        Thread.sleep(5);
    }
}

这是对消费者/生产者的调用:

TransferQueue<Event> transferQueue = new LinkedTransferQueue<>();
        ExecutorService exService = Executors.newFixedThreadPool(2);

        Producer producer = new Producer( transferQueue, "1", 2);
        Consumer consumer = new Consumer(transferQueue, "1", 2);

        exService.execute(producer);
        exService.execute(consumer);


        boolean isShutDown = exService.awaitTermination(5000, TimeUnit.MILLISECONDS);

        if (!isShutDown) {

            exService.shutdown();
        }

生产者将只创建有限数量的字符供消费者消费。我怎么知道制作人是否完成角色生成?

我考虑实施超时以了解生产者是否不再发送任何字符,但此实施可能有更好的选择。

您可以只从生产者发送一个带有消息的事件,例如。 “完成”。

然后在您的消费者中,只需检查此消息即可知道流已完成。

超时不是个好主意,因为它可能出于与关闭流不同的原因而发生。

有多种替代方法可以实现此目的:

  1. 使用特殊类型的事件来表明生产者已经完成。 (这基本上就是 Krzysztof Cichocki 的回答所暗示的)。优点:简单。缺点:您必须确保您选择表示“完成”的任何特殊事件都不可能是生产者发出的真实事件。

  2. 使用计数。看起来这就是您的代码已经在尝试执行的操作。例如,将 numberOfMessagesToConsume 参数中的 15 传递给消费者构造函数,然后 Run() 方法在消费了 15 条消息后停止。优点:简单。缺点:不灵活,你可能事先不知道生产者会产生多少消息。

  3. 监控生产者线程的状态。例如,消费者可以检查 while (producerThread.isAlive()) {...}。生产者线程将在完成消息生产后终止。优点:灵活性。缺点:您不希望消费者了解生产者线程,因为耦合度太高。例如,您可以使用 new Thread(...) 启动生产者,或者您可以使用 ExecutorServiceCompletableFuture。消费者不需要知道。

减轻选项 3 缺点的一种方法是将一个函数传递给消费者,以将生产者状态的测试与线程细节分离:

构造函数:

Consumer(TransferQueue<MyEvent> transferQueue, String name, BooleanSupplier isProducerStillProducing)

使用 lambda 调用构造函数:

new Consumer(transferQueue, name, () -> producerThread.isAlive())

run()方法中测试:

while (isProducerStillProducing.getAsBoolean()) { ... }

有时协调生产者和消费者的关闭可能是一项非常令人费解的任务。有时,由于语法差异,一种编程语言比另一种编程语言更容易。 以下使用 Ada 编程语言编写的示例创建了一个生产者和一个消费者。生产者向消费者发送一系列字符。消费者在接收到每个字符时将其打印出来。生产者终止时消费者也终止。

此示例使用 Ada Rendezvous 机制在任务(也称为线程)之间进行通信。

with Ada.Text_IO; use Ada.Text_IO;

procedure Main is
   task producer;
   task consumer is
      entry send (Item : in Character);
   end consumer;

   task body producer is
      subtype lower is Character range 'a' .. 'z';
      subtype upper is Character range 'A' .. 'Z';
   begin
      for C in lower loop
         consumer.send (C);
         delay 0.05;
      end loop;
      for C in upper loop
         consumer.send (C);
         delay 0.05;
      end loop;
   end producer;

   task body consumer is
      Char : Character;
   begin
      loop
         select
            accept send (Item : in Character) do
               Char := Item;
            end send;
            Put (Char);
            if Char = 'z' then
               New_Line(2);
            end if;
         or
            terminate;
         end select;
      end loop;
   end consumer;

begin
   null;
end Main;

这个程序的输出是:

abcdefghijklmnopqrstuvwxyz

ABCDEFGHIJKLMNOPQRSTUVWXYZ