Java 生产者消费者 ArrayBlockingQueue 在 take() 上死锁
Java Producer Consumer ArrayBlockingQueue deadlock on take()
在我的应用程序中有 2 个阶段,一个下载一些大数据,另一个处理它。
所以我创建了 2 类 来实现可运行的:ImageDownloader 和 ImageManipulator,它们共享一个 downloadedBlockingQueue:
public class ImageDownloader implements Runnable {
private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
private ArrayBlockingQueue<String> imgUrlsBlockingQueue;
public ImageDownloader(ArrayBlockingQueue<String> imgUrlsBlockingQueue, ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue) {
this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
this.imgUrlsBlockingQueue = imgUrlsBlockingQueue;
}
@Override
public void run() {
while (!this.imgUrlsBlockingQueue.isEmpty()) {
try {
String imgUrl = this.imgUrlsBlockingQueue.take();
ImageBean imageBean = doYourThing(imgUrl);
this.downloadedImagesBlockingQueue.add(imageBean);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ImageManipulator implements Runnable {
private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
private AtomicInteger capacity;
public ImageManipulator(ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
AtomicInteger capacity) {
this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
this.capacity = capacity;
}
@Override
public void run() {
while (capacity.get() > 0) {
try {
ImageBean imageBean = downloadedImagesBlockingQueue.take(); // <- HERE I GET THE DEADLOCK
capacity.decrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
// ....
}
}
}
public class Main {
public static void main(String[] args) {
String[] imageUrls = new String[]{"url1", "url2"};
int capacity = imageUrls.length;
ArrayBlockingQueue<String> imgUrlsBlockingQueue = initImgUrlsBlockingQueue(imageUrls, capacity);
ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue = new ArrayBlockingQueue<>(capacity);
ExecutorService downloaderExecutor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
Runnable worker = new ImageDownloader(imgUrlsBlockingQueue, downloadedImagesBlockingQueue);
downloaderExecutor.execute(worker);
}
downloaderExecutor.shutdown();
ExecutorService manipulatorExecutor = Executors.newFixedThreadPool(3);
AtomicInteger manipulatorCapacity = new AtomicInteger(capacity);
for (int i = 0; i < 3; i++) {
Runnable worker = new ImageManipulator(downloadedImagesBlockingQueue, manipulatorCapacity);
manipulatorExecutor.execute(worker);
}
manipulatorExecutor.shutdown();
while (!downloaderExecutor.isTerminated() && !manipulatorExecutor.isTerminated()) {
}
}
}
发生死锁是因为这种情况:
t1 检查其容量 1.
t2 检查其 1.
t3 检查其 1.
t2 接受,将容量设置为 0,继续流并最终退出。
t1 和 t3 现在处于死锁状态,因为不会添加到 downloadedImagesBlockingQueue。
最终我想要这样的东西:当达到容量时 && 队列为空 = 打破 "while" 循环,并优雅地终止。
将 "is queue empty" 设置为唯一条件将不起作用,因为开始时它是空的,直到某些 ImageDownloader 将 imageBean 放入队列中。
您的消费者中不需要 capacity
。它现在在多个线程中读取和更新,这会导致同步问题。
initImgUrlsBlockingQueue
创建具有 capacity
个 URL 项目的 url 阻塞队列。 (对吗?)
ImageDownloader
消耗 imgUrlsBlockingQueue
并生成图像,当所有 URL 都下载完毕时终止,或者,如果 capacity
表示应该下载的图像数量下载因为可能有一些失败,它在添加 capacity
张图像时终止。
- 在
ImageDownloader
终止之前,它在downloadedImagesBlockingQueue
中添加了一个标记,例如,一个空元素,一个static final ImageBean static final ImageBean marker = new ImageBean()
.
所有 ImageManipulator
使用以下构造排空队列,当它看到空元素时,它再次将其添加到队列并终止。
// use identity comparison
while ((imageBean = downloadedImagesBlockingQueue.take()) != marker) {
// process image
}
downloadedImagesBlockingQueue.add(marker);
请注意,BlockingQueue
promises 它的方法调用它是原子的,但是,如果您先检查它的容量,并根据容量消耗一个元素,则操作组将不是原子的。
您可以采取一些措施来防止死锁:
- 使用容量为
的LinkedBlockingQueue
- 使用
offer
加入不阻塞的队列
- 使用
drainTo
或poll
从队列中取出未阻塞的项目
您可能还需要考虑一些提示:
- 使用
ThreadPool
:
final ExecutorService executorService = Executors.newFixedThreadPool(4);
- 如果您使用固定大小
ThreadPool
,您可以在将数据添加到与ThreadPool
大小相对应的队列中后添加"poison pill"s,并在您poll
使用 ThreadPool
就这么简单:
final ExecutorService executorService = Executors.newFixedThreadPool(4);
final Future<?> result = executorService.submit(new Runnable() {
@Override
public void run() {
}
});
还有一个鲜为人知的 ExecutorCompletionService
抽象了整个过程。更多信息 here.
您的问题是您正在尝试使用计数器来跟踪队列元素,而不是组合需要原子性的操作。你正在做检查,取,递减。这允许队列大小和计数器去同步并且你的线程永远阻塞。最好编写一个 'closeable' 的同步原语,这样您就不必保留关联的计数器。然而,一个快速的解决办法是改变它,这样你就可以原子地获取和递减计数器:
while (capacity.getAndDecrement() > 0) {
try {
ImageBean imageBean = downloadedImagesBlockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
在这种情况下,如果有 3 个线程并且队列中只剩下一个元素,那么只有一个线程会自动递减计数器并确保它可以在不阻塞的情况下获取。其他两个线程都将看到 0 或 <0 并跳出循环。
您还需要使所有 class 实例变量成为最终变量,以便它们具有正确的内存可见性。您还应该确定如何处理中断,而不是依赖默认的打印跟踪模板。
好吧,我使用了一些建议的功能,但这对我来说是完整的解决方案,不需要忙于等待并等待下载程序通知它。
public ImageManipulator(LinkedBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
LinkedBlockingQueue<ImageBean> manipulatedImagesBlockingQueue,
AtomicInteger capacity,
ManipulatedData manipulatedData,
ReentrantLock downloaderReentrantLock,
ReentrantLock manipulatorReentrantLock,
Condition downloaderNotFull,
Condition manipulatorNotFull) {
this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
this.manipulatedImagesBlockingQueue = manipulatedImagesBlockingQueue;
this.capacity = capacity;
this.downloaderReentrantLock = downloaderReentrantLock;
this.manipulatorReentrantLock = manipulatorReentrantLock;
this.downloaderNotFull = downloaderNotFull;
this.manipulatorNotFull = manipulatorNotFull;
this.manipulatedData = manipulatedData;
}
@Override
public void run() {
while (capacity.get() > 0) {
downloaderReentrantLock.lock();
if (capacity.get() > 0) { //checks if the value is updated.
ImageBean imageBean = downloadedImagesBlockingQueue.poll();
if (imageBean != null) { // will be null if no downloader finished is work (successfully downloaded or not)
capacity.decrementAndGet();
if (capacity.get() == 0) { //signal all the manipulators to wake up and stop waiting for downloaded images.
downloaderNotFull.signalAll();
}
downloaderReentrantLock.unlock();
if (imageBean.getOriginalImage() != null) { // the downloader will set it null iff it failes to download it.
// business logic
}
manipulatedImagesBlockingQueue.add(imageBean);
signalAllPersisters(); // signal the persisters (which has the same lock/unlock as this manipulator.
} else {
try {
downloaderNotFull.await(); //manipulator will wait for downloaded image - downloader will signalAllManipulators (same as signalAllPersisters() here) when an imageBean will be inserted to queue.
downloaderReentrantLock.unlock();
} catch (InterruptedException e) {
logger.log(Level.ERROR, e.getMessage(), e);
}
}
}
}
logger.log(Level.INFO, "Manipulator: " + Thread.currentThread().getId() + " Ended Gracefully");
}
private void signalAllPersisters() {
manipulatorReentrantLock.lock();
manipulatorNotFull.signalAll();
manipulatorReentrantLock.unlock();
}
如需完整流程,您可以在我的 github 上查看此项目:https://github.com/roy-key/image-service/
在我的应用程序中有 2 个阶段,一个下载一些大数据,另一个处理它。 所以我创建了 2 类 来实现可运行的:ImageDownloader 和 ImageManipulator,它们共享一个 downloadedBlockingQueue:
public class ImageDownloader implements Runnable {
private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
private ArrayBlockingQueue<String> imgUrlsBlockingQueue;
public ImageDownloader(ArrayBlockingQueue<String> imgUrlsBlockingQueue, ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue) {
this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
this.imgUrlsBlockingQueue = imgUrlsBlockingQueue;
}
@Override
public void run() {
while (!this.imgUrlsBlockingQueue.isEmpty()) {
try {
String imgUrl = this.imgUrlsBlockingQueue.take();
ImageBean imageBean = doYourThing(imgUrl);
this.downloadedImagesBlockingQueue.add(imageBean);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ImageManipulator implements Runnable {
private ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue;
private AtomicInteger capacity;
public ImageManipulator(ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
AtomicInteger capacity) {
this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
this.capacity = capacity;
}
@Override
public void run() {
while (capacity.get() > 0) {
try {
ImageBean imageBean = downloadedImagesBlockingQueue.take(); // <- HERE I GET THE DEADLOCK
capacity.decrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
// ....
}
}
}
public class Main {
public static void main(String[] args) {
String[] imageUrls = new String[]{"url1", "url2"};
int capacity = imageUrls.length;
ArrayBlockingQueue<String> imgUrlsBlockingQueue = initImgUrlsBlockingQueue(imageUrls, capacity);
ArrayBlockingQueue<ImageBean> downloadedImagesBlockingQueue = new ArrayBlockingQueue<>(capacity);
ExecutorService downloaderExecutor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
Runnable worker = new ImageDownloader(imgUrlsBlockingQueue, downloadedImagesBlockingQueue);
downloaderExecutor.execute(worker);
}
downloaderExecutor.shutdown();
ExecutorService manipulatorExecutor = Executors.newFixedThreadPool(3);
AtomicInteger manipulatorCapacity = new AtomicInteger(capacity);
for (int i = 0; i < 3; i++) {
Runnable worker = new ImageManipulator(downloadedImagesBlockingQueue, manipulatorCapacity);
manipulatorExecutor.execute(worker);
}
manipulatorExecutor.shutdown();
while (!downloaderExecutor.isTerminated() && !manipulatorExecutor.isTerminated()) {
}
}
}
发生死锁是因为这种情况: t1 检查其容量 1.
t2 检查其 1.
t3 检查其 1.
t2 接受,将容量设置为 0,继续流并最终退出。 t1 和 t3 现在处于死锁状态,因为不会添加到 downloadedImagesBlockingQueue。
最终我想要这样的东西:当达到容量时 && 队列为空 = 打破 "while" 循环,并优雅地终止。
将 "is queue empty" 设置为唯一条件将不起作用,因为开始时它是空的,直到某些 ImageDownloader 将 imageBean 放入队列中。
您的消费者中不需要 capacity
。它现在在多个线程中读取和更新,这会导致同步问题。
initImgUrlsBlockingQueue
创建具有capacity
个 URL 项目的 url 阻塞队列。 (对吗?)ImageDownloader
消耗imgUrlsBlockingQueue
并生成图像,当所有 URL 都下载完毕时终止,或者,如果capacity
表示应该下载的图像数量下载因为可能有一些失败,它在添加capacity
张图像时终止。- 在
ImageDownloader
终止之前,它在downloadedImagesBlockingQueue
中添加了一个标记,例如,一个空元素,一个static final ImageBeanstatic final ImageBean marker = new ImageBean()
. 所有
ImageManipulator
使用以下构造排空队列,当它看到空元素时,它再次将其添加到队列并终止。// use identity comparison while ((imageBean = downloadedImagesBlockingQueue.take()) != marker) { // process image } downloadedImagesBlockingQueue.add(marker);
请注意,BlockingQueue
promises 它的方法调用它是原子的,但是,如果您先检查它的容量,并根据容量消耗一个元素,则操作组将不是原子的。
您可以采取一些措施来防止死锁:
- 使用容量为 的
- 使用
offer
加入不阻塞的队列 - 使用
drainTo
或poll
从队列中取出未阻塞的项目
LinkedBlockingQueue
您可能还需要考虑一些提示:
- 使用
ThreadPool
:
final ExecutorService executorService = Executors.newFixedThreadPool(4);
- 如果您使用固定大小
ThreadPool
,您可以在将数据添加到与ThreadPool
大小相对应的队列中后添加"poison pill"s,并在您poll
使用 ThreadPool
就这么简单:
final ExecutorService executorService = Executors.newFixedThreadPool(4);
final Future<?> result = executorService.submit(new Runnable() {
@Override
public void run() {
}
});
还有一个鲜为人知的 ExecutorCompletionService
抽象了整个过程。更多信息 here.
您的问题是您正在尝试使用计数器来跟踪队列元素,而不是组合需要原子性的操作。你正在做检查,取,递减。这允许队列大小和计数器去同步并且你的线程永远阻塞。最好编写一个 'closeable' 的同步原语,这样您就不必保留关联的计数器。然而,一个快速的解决办法是改变它,这样你就可以原子地获取和递减计数器:
while (capacity.getAndDecrement() > 0) {
try {
ImageBean imageBean = downloadedImagesBlockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
在这种情况下,如果有 3 个线程并且队列中只剩下一个元素,那么只有一个线程会自动递减计数器并确保它可以在不阻塞的情况下获取。其他两个线程都将看到 0 或 <0 并跳出循环。
您还需要使所有 class 实例变量成为最终变量,以便它们具有正确的内存可见性。您还应该确定如何处理中断,而不是依赖默认的打印跟踪模板。
好吧,我使用了一些建议的功能,但这对我来说是完整的解决方案,不需要忙于等待并等待下载程序通知它。
public ImageManipulator(LinkedBlockingQueue<ImageBean> downloadedImagesBlockingQueue,
LinkedBlockingQueue<ImageBean> manipulatedImagesBlockingQueue,
AtomicInteger capacity,
ManipulatedData manipulatedData,
ReentrantLock downloaderReentrantLock,
ReentrantLock manipulatorReentrantLock,
Condition downloaderNotFull,
Condition manipulatorNotFull) {
this.downloadedImagesBlockingQueue = downloadedImagesBlockingQueue;
this.manipulatedImagesBlockingQueue = manipulatedImagesBlockingQueue;
this.capacity = capacity;
this.downloaderReentrantLock = downloaderReentrantLock;
this.manipulatorReentrantLock = manipulatorReentrantLock;
this.downloaderNotFull = downloaderNotFull;
this.manipulatorNotFull = manipulatorNotFull;
this.manipulatedData = manipulatedData;
}
@Override
public void run() {
while (capacity.get() > 0) {
downloaderReentrantLock.lock();
if (capacity.get() > 0) { //checks if the value is updated.
ImageBean imageBean = downloadedImagesBlockingQueue.poll();
if (imageBean != null) { // will be null if no downloader finished is work (successfully downloaded or not)
capacity.decrementAndGet();
if (capacity.get() == 0) { //signal all the manipulators to wake up and stop waiting for downloaded images.
downloaderNotFull.signalAll();
}
downloaderReentrantLock.unlock();
if (imageBean.getOriginalImage() != null) { // the downloader will set it null iff it failes to download it.
// business logic
}
manipulatedImagesBlockingQueue.add(imageBean);
signalAllPersisters(); // signal the persisters (which has the same lock/unlock as this manipulator.
} else {
try {
downloaderNotFull.await(); //manipulator will wait for downloaded image - downloader will signalAllManipulators (same as signalAllPersisters() here) when an imageBean will be inserted to queue.
downloaderReentrantLock.unlock();
} catch (InterruptedException e) {
logger.log(Level.ERROR, e.getMessage(), e);
}
}
}
}
logger.log(Level.INFO, "Manipulator: " + Thread.currentThread().getId() + " Ended Gracefully");
}
private void signalAllPersisters() {
manipulatorReentrantLock.lock();
manipulatorNotFull.signalAll();
manipulatorReentrantLock.unlock();
}
如需完整流程,您可以在我的 github 上查看此项目:https://github.com/roy-key/image-service/