等待所有阻塞队列元素取出后处理
Wait for all blocking queue elements to be processed after they are taken out
在以下情况下,终结器线程必须等待消费者线程处理完所有队列元素才能完成执行:
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private final Object queueMonitor = new Object();
// Consumer thread
while (true) {
Object element = queue.take();
consume(element);
synchronized (queueMonitor) {
queueMonitor.notifyAll();
}
}
// Finalizer thread
synchronized (queueMonitor) {
while (!queue.isEmpty()) {
queueMonitor.wait();
}
}
随着时间的推移,元素被添加到队列中。
消费者守护线程一直运行到 JVM 终止,此时必须允许它完成所有排队元素的处理。
目前这是由终结器线程完成的,它是一个关闭钩子,应该延迟 JVM 终止时消费者线程的终止。
问题:
如果在从队列中取出最后一个元素后启动终结器线程,则 while 循环条件的计算结果为 false
,因此执行完成而 consume()
尚未返回,因为等待 [=13] =] 被完全跳过。
研究:
一个理想的解决方案是 peek the queue,然后在元素被消耗后删除它。
一种方法是使用 CountDownLatch
– 在其上放置终结器块,并在 consume()
之后调用消费者倒计时。
基本上不在队列上阻塞,在任务完成时阻塞。
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private volatile boolean running = true;
private final CountDownLatch terminationLatch = new CountDownLatch(1);
// Consumer thread
while (running || !queue.isEmpty()) {
Object element = queue.poll(100, TimeUnit.MILLISECONDS);
if (element == null) continue;
consume(element);
}
terminationLatch.countDown();
// Finalizer thread
running = false;
terminationLatch.await();
在以下情况下,终结器线程必须等待消费者线程处理完所有队列元素才能完成执行:
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private final Object queueMonitor = new Object();
// Consumer thread
while (true) {
Object element = queue.take();
consume(element);
synchronized (queueMonitor) {
queueMonitor.notifyAll();
}
}
// Finalizer thread
synchronized (queueMonitor) {
while (!queue.isEmpty()) {
queueMonitor.wait();
}
}
随着时间的推移,元素被添加到队列中。 消费者守护线程一直运行到 JVM 终止,此时必须允许它完成所有排队元素的处理。 目前这是由终结器线程完成的,它是一个关闭钩子,应该延迟 JVM 终止时消费者线程的终止。
问题:
如果在从队列中取出最后一个元素后启动终结器线程,则 while 循环条件的计算结果为 false
,因此执行完成而 consume()
尚未返回,因为等待 [=13] =] 被完全跳过。
研究:
一个理想的解决方案是 peek the queue,然后在元素被消耗后删除它。
一种方法是使用 CountDownLatch
– 在其上放置终结器块,并在 consume()
之后调用消费者倒计时。
基本上不在队列上阻塞,在任务完成时阻塞。
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private volatile boolean running = true;
private final CountDownLatch terminationLatch = new CountDownLatch(1);
// Consumer thread
while (running || !queue.isEmpty()) {
Object element = queue.poll(100, TimeUnit.MILLISECONDS);
if (element == null) continue;
consume(element);
}
terminationLatch.countDown();
// Finalizer thread
running = false;
terminationLatch.await();