查找生产者是否完成角色生成
Find if the producer is completed the character generation
我有生成随机字符的生产者代码:
public class Producer implements Runnable {
@Override
public void run() {
Stream<Character> generate = Stream.generate(this::generateRandomCharacter).limit(15);
generate.forEach(character -> {
MyEvent myEvent = new MyEvent();
myEvent.setMesage(character + "");
LOG.info("Producer: " + name + " is waiting to transfer...");
try {
boolean added = transferQueue.tryTransfer(myEvent, 4000, TimeUnit.MILLISECONDS);
if (added) {
numberOfProducedMessages.incrementAndGet();
LOG.info("Producer: " + name + " transferred element: A");
} else {
LOG.info("can not add an element due to the timeout");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
提供消费者代码:
public class Consumer implements Runnable {
private static final Logger LOG = Logger.getLogger(Consumer.class.getName());
private final TransferQueue<MyEvent> transferQueue;
private final String name;
final int numberOfMessagesToConsume;
final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
Consumer(TransferQueue<MyEvent> transferQueue, String name, int numberOfMessagesToConsume) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
}
@Override
public void run() {
while (true){
try {
LOG.info("Consumer: " + name + " is waiting to take element...");
MyEvent element = transferQueue.take();
longProcessing(element);
System.out.println("Consumer: " + name + " received element with messgae : " + element.getMesage());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(MyEvent element) throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(5);
}
}
这是对消费者/生产者的调用:
TransferQueue<Event> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer( transferQueue, "1", 2);
Consumer consumer = new Consumer(transferQueue, "1", 2);
exService.execute(producer);
exService.execute(consumer);
boolean isShutDown = exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
if (!isShutDown) {
exService.shutdown();
}
生产者将只创建有限数量的字符供消费者消费。我怎么知道制作人是否完成角色生成?
我考虑实施超时以了解生产者是否不再发送任何字符,但此实施可能有更好的选择。
您可以只从生产者发送一个带有消息的事件,例如。 “完成”。
然后在您的消费者中,只需检查此消息即可知道流已完成。
超时不是个好主意,因为它可能出于与关闭流不同的原因而发生。
有多种替代方法可以实现此目的:
使用特殊类型的事件来表明生产者已经完成。 (这基本上就是 Krzysztof Cichocki 的回答所暗示的)。优点:简单。缺点:您必须确保您选择表示“完成”的任何特殊事件都不可能是生产者发出的真实事件。
使用计数。看起来这就是您的代码已经在尝试执行的操作。例如,将 numberOfMessagesToConsume
参数中的 15 传递给消费者构造函数,然后 Run()
方法在消费了 15 条消息后停止。优点:简单。缺点:不灵活,你可能事先不知道生产者会产生多少消息。
监控生产者线程的状态。例如,消费者可以检查 while (producerThread.isAlive()) {...}
。生产者线程将在完成消息生产后终止。优点:灵活性。缺点:您不希望消费者了解生产者线程,因为耦合度太高。例如,您可以使用 new Thread(...)
启动生产者,或者您可以使用 ExecutorService
或 CompletableFuture
。消费者不需要知道。
减轻选项 3 缺点的一种方法是将一个函数传递给消费者,以将生产者状态的测试与线程细节分离:
构造函数:
Consumer(TransferQueue<MyEvent> transferQueue, String name, BooleanSupplier isProducerStillProducing)
使用 lambda 调用构造函数:
new Consumer(transferQueue, name, () -> producerThread.isAlive())
在run()
方法中测试:
while (isProducerStillProducing.getAsBoolean()) { ... }
有时协调生产者和消费者的关闭可能是一项非常令人费解的任务。有时,由于语法差异,一种编程语言比另一种编程语言更容易。
以下使用 Ada 编程语言编写的示例创建了一个生产者和一个消费者。生产者向消费者发送一系列字符。消费者在接收到每个字符时将其打印出来。生产者终止时消费者也终止。
此示例使用 Ada Rendezvous 机制在任务(也称为线程)之间进行通信。
with Ada.Text_IO; use Ada.Text_IO;
procedure Main is
task producer;
task consumer is
entry send (Item : in Character);
end consumer;
task body producer is
subtype lower is Character range 'a' .. 'z';
subtype upper is Character range 'A' .. 'Z';
begin
for C in lower loop
consumer.send (C);
delay 0.05;
end loop;
for C in upper loop
consumer.send (C);
delay 0.05;
end loop;
end producer;
task body consumer is
Char : Character;
begin
loop
select
accept send (Item : in Character) do
Char := Item;
end send;
Put (Char);
if Char = 'z' then
New_Line(2);
end if;
or
terminate;
end select;
end loop;
end consumer;
begin
null;
end Main;
这个程序的输出是:
abcdefghijklmnopqrstuvwxyz
ABCDEFGHIJKLMNOPQRSTUVWXYZ
我有生成随机字符的生产者代码:
public class Producer implements Runnable {
@Override
public void run() {
Stream<Character> generate = Stream.generate(this::generateRandomCharacter).limit(15);
generate.forEach(character -> {
MyEvent myEvent = new MyEvent();
myEvent.setMesage(character + "");
LOG.info("Producer: " + name + " is waiting to transfer...");
try {
boolean added = transferQueue.tryTransfer(myEvent, 4000, TimeUnit.MILLISECONDS);
if (added) {
numberOfProducedMessages.incrementAndGet();
LOG.info("Producer: " + name + " transferred element: A");
} else {
LOG.info("can not add an element due to the timeout");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
提供消费者代码:
public class Consumer implements Runnable {
private static final Logger LOG = Logger.getLogger(Consumer.class.getName());
private final TransferQueue<MyEvent> transferQueue;
private final String name;
final int numberOfMessagesToConsume;
final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
Consumer(TransferQueue<MyEvent> transferQueue, String name, int numberOfMessagesToConsume) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
}
@Override
public void run() {
while (true){
try {
LOG.info("Consumer: " + name + " is waiting to take element...");
MyEvent element = transferQueue.take();
longProcessing(element);
System.out.println("Consumer: " + name + " received element with messgae : " + element.getMesage());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(MyEvent element) throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(5);
}
}
这是对消费者/生产者的调用:
TransferQueue<Event> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer( transferQueue, "1", 2);
Consumer consumer = new Consumer(transferQueue, "1", 2);
exService.execute(producer);
exService.execute(consumer);
boolean isShutDown = exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
if (!isShutDown) {
exService.shutdown();
}
生产者将只创建有限数量的字符供消费者消费。我怎么知道制作人是否完成角色生成?
我考虑实施超时以了解生产者是否不再发送任何字符,但此实施可能有更好的选择。
您可以只从生产者发送一个带有消息的事件,例如。 “完成”。
然后在您的消费者中,只需检查此消息即可知道流已完成。
超时不是个好主意,因为它可能出于与关闭流不同的原因而发生。
有多种替代方法可以实现此目的:
使用特殊类型的事件来表明生产者已经完成。 (这基本上就是 Krzysztof Cichocki 的回答所暗示的)。优点:简单。缺点:您必须确保您选择表示“完成”的任何特殊事件都不可能是生产者发出的真实事件。
使用计数。看起来这就是您的代码已经在尝试执行的操作。例如,将
numberOfMessagesToConsume
参数中的 15 传递给消费者构造函数,然后Run()
方法在消费了 15 条消息后停止。优点:简单。缺点:不灵活,你可能事先不知道生产者会产生多少消息。监控生产者线程的状态。例如,消费者可以检查
while (producerThread.isAlive()) {...}
。生产者线程将在完成消息生产后终止。优点:灵活性。缺点:您不希望消费者了解生产者线程,因为耦合度太高。例如,您可以使用new Thread(...)
启动生产者,或者您可以使用ExecutorService
或CompletableFuture
。消费者不需要知道。
减轻选项 3 缺点的一种方法是将一个函数传递给消费者,以将生产者状态的测试与线程细节分离:
构造函数:
Consumer(TransferQueue<MyEvent> transferQueue, String name, BooleanSupplier isProducerStillProducing)
使用 lambda 调用构造函数:
new Consumer(transferQueue, name, () -> producerThread.isAlive())
在run()
方法中测试:
while (isProducerStillProducing.getAsBoolean()) { ... }
有时协调生产者和消费者的关闭可能是一项非常令人费解的任务。有时,由于语法差异,一种编程语言比另一种编程语言更容易。 以下使用 Ada 编程语言编写的示例创建了一个生产者和一个消费者。生产者向消费者发送一系列字符。消费者在接收到每个字符时将其打印出来。生产者终止时消费者也终止。
此示例使用 Ada Rendezvous 机制在任务(也称为线程)之间进行通信。
with Ada.Text_IO; use Ada.Text_IO;
procedure Main is
task producer;
task consumer is
entry send (Item : in Character);
end consumer;
task body producer is
subtype lower is Character range 'a' .. 'z';
subtype upper is Character range 'A' .. 'Z';
begin
for C in lower loop
consumer.send (C);
delay 0.05;
end loop;
for C in upper loop
consumer.send (C);
delay 0.05;
end loop;
end producer;
task body consumer is
Char : Character;
begin
loop
select
accept send (Item : in Character) do
Char := Item;
end send;
Put (Char);
if Char = 'z' then
New_Line(2);
end if;
or
terminate;
end select;
end loop;
end consumer;
begin
null;
end Main;
这个程序的输出是:
abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ