具有阻塞队列的生产者-消费者设计模式 java
Producer-consumers design pattern with blocking Queue java
我必须创建向我们的客户发送 SMS 的消息发送应用程序。由于生成消息并向客户发送消息需要花费大量时间,因此我决定使用生产者和消费者模式来实现它。所以,这不会影响原来的执行流程。
一旦我将原始数据作为对象放入队列,这将由线程池中的一个消费者线程选择,然后生成消息并发送 SMS。一旦应用程序启动并且 运行.
,此流程应该继续
我的应用程序运行良好。但我发现消费者和生产者线程池创建的每个线程即使在完成发送短信任务后仍处于等待状态。长 运行 应用程序是否存在问题,或者我可以一直使用消费者和生产者线程池,而不是每次 initializingSendMessage(RawData trxn)[=30= 时都创建新线程池吗? ] 调用方法?
MessageSendUtil class 用于创建公共队列和初始化任务。
public class SendMessageUtil {
public static void initializingSendMessage(RawData trxn) {
BlockingQueue<Message> sharedQueue = new LinkedBlockingQueue<>();
ExecutorService produceMessagePool = Executors.newFixedThreadPool(1);
ExecutorService consumerMessagePool = Executors.newFixedThreadPool(5);
try {
produceMessagePool.submit(new Producer(sharedQueue));
int i = 0;
while (i++<5) {
consumerMessagePool.submit(new Consumer(sharedQueue));
}
produceMessagePool.shutdown();
consumerMessagePool.shutdown();
} catch (Exception ex){
System.out.println(ex.getMessage());
}
}
我的消费者和生产者 class 看起来像这样。
public class Producer implements Runnable {
private final BlockingQueue<Message> sharedQueue;
public Producer(BlockingQueue<Message> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
try {
Message message = new Message();
message.setMessage("Test message sending");
sharedQueue.add(message);
} catch (Exception err) {
err.printStackTrace();
}
}
}
/
public class Consumer implements Runnable {
private final BlockingQueue<Message> sharedQueue;
private MessageBroadcaster messageBroadcaster;
public Consumer(BlockingQueue<Message> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
initializeMessageBroadcaster();
//Send messages to customer
while(true){
try {
Message message = sharedQueue.take();
messageBroadcaster.sendMessage(message);
} catch (Exception err) {
err.printStackTrace();
}
}
}
private void initializeMessageBroadcaster() {
if(Objects.isNull(messageBroadcaster)){
messageBroadcaster = new MessageBroadcasterImpl();
}
}
}
在多次调用initializingSendMessage(RawData trxn) 后,实时线程显示如下。
My application works fine. but I found that each thread that consumer and producer thread pool creates stay alive in waiting state even after it finished the send SMS task.
没错。您的消费者线程任务位于:
while (true) {
它永远不会停止。有很多方法可以解决这个问题。一种方法是将恒定的“完成”消息放在队列中。您可以将与消费者相同数量的已完成消息放在队列中,也可以使用类似这样的逻辑:
private static final Message DONE_MESSAGE = new Message();
...
// producer puts it into the queue when it is done
sharedQueue.add(DONE_MESSAGE);
...
// consumer takes it from the queue and quits
Message message = sharedQueue.take();
if (message == DONE_MESSAGE) {
// put it back on the queue to stop the other consumer threads
sharedQueue.put(DONE_MESSAGE);
// quit the consumer thread
return;
}
is it a problem for long running application or can I use the the consumer and producer thread pool for all the time instead of creating new thread pool each time when initializingSendMessage(RawData trxn) method invoked?
我建议在应用程序的整个生命周期内保持相同的线程池 运行。重点是重用线程,而不是一直创建和关闭它们。此外,拥有 2 个固定线程池毫无意义。大小为 6 甚至只是使用缓存线程池的一个,它将为每个生产者和消费者作业创建一个线程。
我必须创建向我们的客户发送 SMS 的消息发送应用程序。由于生成消息并向客户发送消息需要花费大量时间,因此我决定使用生产者和消费者模式来实现它。所以,这不会影响原来的执行流程。
一旦我将原始数据作为对象放入队列,这将由线程池中的一个消费者线程选择,然后生成消息并发送 SMS。一旦应用程序启动并且 运行.
,此流程应该继续我的应用程序运行良好。但我发现消费者和生产者线程池创建的每个线程即使在完成发送短信任务后仍处于等待状态。长 运行 应用程序是否存在问题,或者我可以一直使用消费者和生产者线程池,而不是每次 initializingSendMessage(RawData trxn)[=30= 时都创建新线程池吗? ] 调用方法?
MessageSendUtil class 用于创建公共队列和初始化任务。
public class SendMessageUtil {
public static void initializingSendMessage(RawData trxn) {
BlockingQueue<Message> sharedQueue = new LinkedBlockingQueue<>();
ExecutorService produceMessagePool = Executors.newFixedThreadPool(1);
ExecutorService consumerMessagePool = Executors.newFixedThreadPool(5);
try {
produceMessagePool.submit(new Producer(sharedQueue));
int i = 0;
while (i++<5) {
consumerMessagePool.submit(new Consumer(sharedQueue));
}
produceMessagePool.shutdown();
consumerMessagePool.shutdown();
} catch (Exception ex){
System.out.println(ex.getMessage());
}
}
我的消费者和生产者 class 看起来像这样。
public class Producer implements Runnable {
private final BlockingQueue<Message> sharedQueue;
public Producer(BlockingQueue<Message> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
try {
Message message = new Message();
message.setMessage("Test message sending");
sharedQueue.add(message);
} catch (Exception err) {
err.printStackTrace();
}
}
}
/
public class Consumer implements Runnable {
private final BlockingQueue<Message> sharedQueue;
private MessageBroadcaster messageBroadcaster;
public Consumer(BlockingQueue<Message> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
initializeMessageBroadcaster();
//Send messages to customer
while(true){
try {
Message message = sharedQueue.take();
messageBroadcaster.sendMessage(message);
} catch (Exception err) {
err.printStackTrace();
}
}
}
private void initializeMessageBroadcaster() {
if(Objects.isNull(messageBroadcaster)){
messageBroadcaster = new MessageBroadcasterImpl();
}
}
}
在多次调用initializingSendMessage(RawData trxn) 后,实时线程显示如下。
My application works fine. but I found that each thread that consumer and producer thread pool creates stay alive in waiting state even after it finished the send SMS task.
没错。您的消费者线程任务位于:
while (true) {
它永远不会停止。有很多方法可以解决这个问题。一种方法是将恒定的“完成”消息放在队列中。您可以将与消费者相同数量的已完成消息放在队列中,也可以使用类似这样的逻辑:
private static final Message DONE_MESSAGE = new Message();
...
// producer puts it into the queue when it is done
sharedQueue.add(DONE_MESSAGE);
...
// consumer takes it from the queue and quits
Message message = sharedQueue.take();
if (message == DONE_MESSAGE) {
// put it back on the queue to stop the other consumer threads
sharedQueue.put(DONE_MESSAGE);
// quit the consumer thread
return;
}
is it a problem for long running application or can I use the the consumer and producer thread pool for all the time instead of creating new thread pool each time when initializingSendMessage(RawData trxn) method invoked?
我建议在应用程序的整个生命周期内保持相同的线程池 运行。重点是重用线程,而不是一直创建和关闭它们。此外,拥有 2 个固定线程池毫无意义。大小为 6 甚至只是使用缓存线程池的一个,它将为每个生产者和消费者作业创建一个线程。