Java - BlockingQueue 冻结多线程应用程序
Java - BlockingQueue freezes multithread application
我正在制作一个包含两个线程的应用程序:其中一个将值写入 LinkedBlockingQueue,另一个正在读取。我正在使用 ScheduledExecutorService 运行 在几秒钟内执行此操作。
问题是我的应用程序在 BlockingQueue 的方法 take 上冻结,我不明白为什么。
这是一个常用资源:
class Res{
AtomicInteger atomicInteger = new AtomicInteger(0);
BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
这是reader
Semaphore semaphore = new Semaphore(1); /this is for reader does not take two places in thread pool
Runnable reader = ()->{
try {
semaphore.acquire();
System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
作者:
Runnable writer = ()->{
res.q.add("hi");
};
完整代码:
class Res{
AtomicInteger atomicInteger = new AtomicInteger(0);
BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
Res res = new Res();
Semaphore semaphore = new Semaphore(1); //this is for reader does not take two places in thread pool
Runnable reader = ()->{
try {
semaphore.acquire();
System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable writer = ()->{
res.q.add("hi");
};
Random rnd = new Random();
for (int i = 0; i < 20; i++) {
int time = rnd.nextInt(5)+ 2;
executorService.schedule(writer,time, TimeUnit.SECONDS);
}
for (int i = 0; i < 20; i++) {
int time = rnd.nextInt(5)+ 2;
executorService.schedule(reader,time, TimeUnit.SECONDS);
}
executorService.shutdown();
}
它应该打印二十行 "hi [number]",但在某些行上冻结了。
比如我现在的打印:
hi 1
hi 2
hi 3
hi 4
hi 5
我发现如果我增加线程数 newScheduledThreadPool(20)
它就会开始工作,但是我怎样才能使用两个线程呢?谢谢!
要理解您的代码有点困难,但同时很明显发生了什么。由于 Executors.newScheduledThreadPool(2);
,您一次最多可以 运行 两个线程。这两个线程都是 reader
个线程。
所以 Thread-1
进入了 try
块并通过 semaphore.acquire();
获得了信号量许可,但是队列是空的 - 因此它在 res.q.take()
上阻塞。下一个线程 - Thread-2
也是一个 reader 线程,但它无法获取 permit
,因为它已被 Thread-1
占用并在 [=14= 上被阻塞].由于您没有其他线程的空间(您的池被阻止与这两个线程一起工作),因此没有作者会在您的队列中放置一些东西并因此取消阻止 Thread-1
(这样 res.q.take()
就可以工作).
添加更多的工作线程只会延迟问题的发生——您最终可能会陷入与之前相同的境地。
我正在制作一个包含两个线程的应用程序:其中一个将值写入 LinkedBlockingQueue,另一个正在读取。我正在使用 ScheduledExecutorService 运行 在几秒钟内执行此操作。 问题是我的应用程序在 BlockingQueue 的方法 take 上冻结,我不明白为什么。
这是一个常用资源:
class Res{
AtomicInteger atomicInteger = new AtomicInteger(0);
BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
这是reader
Semaphore semaphore = new Semaphore(1); /this is for reader does not take two places in thread pool
Runnable reader = ()->{
try {
semaphore.acquire();
System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
作者:
Runnable writer = ()->{
res.q.add("hi");
};
完整代码:
class Res{
AtomicInteger atomicInteger = new AtomicInteger(0);
BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
Res res = new Res();
Semaphore semaphore = new Semaphore(1); //this is for reader does not take two places in thread pool
Runnable reader = ()->{
try {
semaphore.acquire();
System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable writer = ()->{
res.q.add("hi");
};
Random rnd = new Random();
for (int i = 0; i < 20; i++) {
int time = rnd.nextInt(5)+ 2;
executorService.schedule(writer,time, TimeUnit.SECONDS);
}
for (int i = 0; i < 20; i++) {
int time = rnd.nextInt(5)+ 2;
executorService.schedule(reader,time, TimeUnit.SECONDS);
}
executorService.shutdown();
}
它应该打印二十行 "hi [number]",但在某些行上冻结了。 比如我现在的打印:
hi 1
hi 2
hi 3
hi 4
hi 5
我发现如果我增加线程数 newScheduledThreadPool(20)
它就会开始工作,但是我怎样才能使用两个线程呢?谢谢!
要理解您的代码有点困难,但同时很明显发生了什么。由于 Executors.newScheduledThreadPool(2);
,您一次最多可以 运行 两个线程。这两个线程都是 reader
个线程。
所以 Thread-1
进入了 try
块并通过 semaphore.acquire();
获得了信号量许可,但是队列是空的 - 因此它在 res.q.take()
上阻塞。下一个线程 - Thread-2
也是一个 reader 线程,但它无法获取 permit
,因为它已被 Thread-1
占用并在 [=14= 上被阻塞].由于您没有其他线程的空间(您的池被阻止与这两个线程一起工作),因此没有作者会在您的队列中放置一些东西并因此取消阻止 Thread-1
(这样 res.q.take()
就可以工作).
添加更多的工作线程只会延迟问题的发生——您最终可能会陷入与之前相同的境地。