如果消费者线程遇到异常,如何停止生产者线程
How to stop the producer thread if consumer thread faces an exception
我有这种生产者消费者情况,我正在使用 arrayblockingqueue。
如果消费者线程遇到异常,如何停止生产者线程。
我需要生产者停止等待队列为空。
我引发了强制运行时异常。但是程序并没有退出。生产者一直在等待队列为空。有人可以帮忙吗
public class ServiceClass implements Runnable{
private final static BlockingQueue<Integer> processQueue = new ArrayBlockingQueue<>(10);
private static final int CONSUMER_COUNT = 1;
private boolean isConsumerInterrupted = false;
private boolean isConsumer = false;
private static boolean producerIsDone = false;
public ServiceClass(boolean consumer,boolean isConsumerInterrupted) {
this.isConsumer = consumer;
this.isConsumerInterrupted = isConsumerInterrupted;
}
public static void main(String[] args) {
long startTime = System.nanoTime();
ExecutorService producerPool = Executors.newFixedThreadPool(1);
producerPool.submit(new ServiceClass(false,false)); // run method is
// called
// create a pool of consumer threads to parse the lines read
ExecutorService consumerPool = Executors.newFixedThreadPool(CONSUMER_COUNT);
for (int i = 0; i < CONSUMER_COUNT; i++) {
consumerPool.submit(new ServiceClass(true,false)); // run method is
// called
}
producerPool.shutdown();
consumerPool.shutdown();
while (!producerPool.isTerminated() && !consumerPool.isTerminated()) {
}
long endTime = System.nanoTime();
long elapsedTimeInMillis = TimeUnit.MILLISECONDS.convert((endTime - startTime), TimeUnit.NANOSECONDS);
System.out.println("Total elapsed time: " + elapsedTimeInMillis + " ms");
}
@Override
public void run() {
if (isConsumer) {
consume();
} else {
readFile(); //produce data by reading a file
}
}
private void readFile() {
//Path file = Paths.get("c:/temp/my-large-file.csv");
try
{
for(int i =0;i<10000;i++) {
if(isConsumerInterrupted) {
break;
}
processQueue.put(i);
System.out.println("produced:" + i+"------"+processQueue.size());
}
//Java 8: Stream class
} catch (Exception e){
e.printStackTrace();
}
producerIsDone = true; // signal consumer
System.out.println(Thread.currentThread().getName() + " producer is done");
}
private void consume() {
try {
while (!producerIsDone || (producerIsDone && !processQueue.isEmpty())) {
System.out.println("consumed:" + processQueue.take()+"------"+processQueue.size());
throw new RuntimeException();
//System.out.println(Thread.currentThread().getName() + ":: consumer count:" + linesReadQueue.size());
}
System.out.println(Thread.currentThread().getName() + " consumer is done");
} catch (Exception e) {
isConsumerInterrupted=true;
}
}
}
您正在使用标志 isConsumerInterrupted
来终止生产者线程。这是错误的。消费者不消费队列中的元素,生产者一直生产直到队列满,然后开始阻塞直到队列未满。然后,当消费者抛出 RuntimeException 时,它会设置标志,并且生产者线程没有机会检查标志,因为 none 的消费者消耗队列中的元素,以便生产者可以从等待状态中出现。一种选择是使用未来并在消费者抛出与设置标志相反的异常时取消它。由于 processQueue.put
响应中断,它将成功终止生产者线程。如果在等待时被打断,则会抛出 InterruptedException
。这是它的样子。
private static Future<?> producerFuture = null;
public static void main(String[] args) {
// Remainder omitted.
producerFuture = producerPool.submit(new ServiceClass(false, false));
// ...
}
private void consume() {
try {
// ...
} catch (Exception e) {
producerFuture.cancel(true);
}
}
我有这种生产者消费者情况,我正在使用 arrayblockingqueue。 如果消费者线程遇到异常,如何停止生产者线程。 我需要生产者停止等待队列为空。 我引发了强制运行时异常。但是程序并没有退出。生产者一直在等待队列为空。有人可以帮忙吗
public class ServiceClass implements Runnable{
private final static BlockingQueue<Integer> processQueue = new ArrayBlockingQueue<>(10);
private static final int CONSUMER_COUNT = 1;
private boolean isConsumerInterrupted = false;
private boolean isConsumer = false;
private static boolean producerIsDone = false;
public ServiceClass(boolean consumer,boolean isConsumerInterrupted) {
this.isConsumer = consumer;
this.isConsumerInterrupted = isConsumerInterrupted;
}
public static void main(String[] args) {
long startTime = System.nanoTime();
ExecutorService producerPool = Executors.newFixedThreadPool(1);
producerPool.submit(new ServiceClass(false,false)); // run method is
// called
// create a pool of consumer threads to parse the lines read
ExecutorService consumerPool = Executors.newFixedThreadPool(CONSUMER_COUNT);
for (int i = 0; i < CONSUMER_COUNT; i++) {
consumerPool.submit(new ServiceClass(true,false)); // run method is
// called
}
producerPool.shutdown();
consumerPool.shutdown();
while (!producerPool.isTerminated() && !consumerPool.isTerminated()) {
}
long endTime = System.nanoTime();
long elapsedTimeInMillis = TimeUnit.MILLISECONDS.convert((endTime - startTime), TimeUnit.NANOSECONDS);
System.out.println("Total elapsed time: " + elapsedTimeInMillis + " ms");
}
@Override
public void run() {
if (isConsumer) {
consume();
} else {
readFile(); //produce data by reading a file
}
}
private void readFile() {
//Path file = Paths.get("c:/temp/my-large-file.csv");
try
{
for(int i =0;i<10000;i++) {
if(isConsumerInterrupted) {
break;
}
processQueue.put(i);
System.out.println("produced:" + i+"------"+processQueue.size());
}
//Java 8: Stream class
} catch (Exception e){
e.printStackTrace();
}
producerIsDone = true; // signal consumer
System.out.println(Thread.currentThread().getName() + " producer is done");
}
private void consume() {
try {
while (!producerIsDone || (producerIsDone && !processQueue.isEmpty())) {
System.out.println("consumed:" + processQueue.take()+"------"+processQueue.size());
throw new RuntimeException();
//System.out.println(Thread.currentThread().getName() + ":: consumer count:" + linesReadQueue.size());
}
System.out.println(Thread.currentThread().getName() + " consumer is done");
} catch (Exception e) {
isConsumerInterrupted=true;
}
}
}
您正在使用标志 isConsumerInterrupted
来终止生产者线程。这是错误的。消费者不消费队列中的元素,生产者一直生产直到队列满,然后开始阻塞直到队列未满。然后,当消费者抛出 RuntimeException 时,它会设置标志,并且生产者线程没有机会检查标志,因为 none 的消费者消耗队列中的元素,以便生产者可以从等待状态中出现。一种选择是使用未来并在消费者抛出与设置标志相反的异常时取消它。由于 processQueue.put
响应中断,它将成功终止生产者线程。如果在等待时被打断,则会抛出 InterruptedException
。这是它的样子。
private static Future<?> producerFuture = null;
public static void main(String[] args) {
// Remainder omitted.
producerFuture = producerPool.submit(new ServiceClass(false, false));
// ...
}
private void consume() {
try {
// ...
} catch (Exception e) {
producerFuture.cancel(true);
}
}