在 Java 中防止生产者和消费者的有界执行程序服务中可能发生的死锁情况
Preventing possible deadlock scenarios in bounded executor services with producers and consumers in Java
考虑这个示例代码:(我已经简化了 类 很多,所以它们更容易阅读)
制作人
class RandomIntegerProducer implements Callable<Void>
{
private final BlockingQueue<? super Integer> queue;
private final Random random;
/* Boilerplate constructor... */
@Override
public Void call()
{
while (!Thread.interrupted())
{
try {
TimeUnit.SECONDS.sleep(1);
queue.put(random.nextInt());
} catch (InterruptedException e)
{
Thread.currentThread().interrupt();
break;
}
}
return null;
}
}
这是一个简单、简洁的任务示例,每秒将一个随机数放入队列中,并且可以使用 Thread.interrupt()
取消。
消费者
class NumberConsumer implements Callable<Void>
{
private final BlockingQueue<? extends Number> queue;
private final Appendable target;
/* Boilerplate constructor... */
@Override
public Void call() throws IOException
{
while (!Thread.interrupted())
{
try {
target.append(queue.take().toString());
} catch (InterruptedException e)
{
Thread.currentThread().interrupt();
break;
}
}
return null;
}
}
消费者从队列中取出数字并打印到指定的Appendable
。可以通过Thread.interrupt()
.
取消
起始码
class ProducerConsumerStarter
{
/* Notice this is a fixed size (e.g. bounded) executor service */
private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8);
public static List<Future<Void>> startIntegerProducerConsumer(int producers, int consumers)
{
List<Callable<Void>> callables = new ArrayList<>();
BlockingQueue<Integer> commonQueue = new ArrayBlockingQueue<>(16);
for (int i = 0; i < producers; i++)
{
callables.add(new RandomIntegerProducer(commonQueue, new Random()));
}
for (int i = 0; i < consumers; i++)
{
callables.add(new NumberConsumer(commonQueue, System.out));
}
// Submit them all (in order)
return callables.stream().map(SERVICE::submit).collect(Collectors.toList());
}
}
此实用程序方法将任务提交到有界执行程序服务(按顺序 - 首先是所有生产者,然后是所有消费者)
失败的客户端代码
public class FailingExaple {
@org.junit.Test
public void deadlockApplication() throws Exception
{
List<Future<Void>> futures = ProducerConsumerStarter.startIntegerProducerConsumer(10, 10);
for (Future<Void> future : futures)
{
System.out.println("Getting future");
future.get();
}
}
}
此示例代码通过将它和任何其他未来的起始代码调用者死锁而使该并发程序失败。
问题是:我怎样才能既防止我的应用程序在高负载下产生大量线程(我希望任务改为排队),又能防止执行程序仅被生产者污染而导致死锁?
即使这个样本在 100% 的时间里都明显失败,考虑一个并发程序,在不幸的情况下,只用生产者完全填充有界执行器——你会遇到同样的一般问题。
什么是死锁? Java Documentation
Deadlock describes a situation where two or more threads are blocked
forever, waiting for each other.
因此,当第一个线程持有监视器 1 并尝试获取监视器 2,而第二个线程持有监视器 2 并尝试获取监视器 1 时,就会发生死锁。
您的代码中没有死锁,因为没有 two or more threads .. waiting for each other
。有生产者在队列中等待 space 而没有消费者,因为由于执行者的线程数而没有安排他们。
此外 "The client code that fails it" 将始终阻塞线程,即使 startIntegerProducerConsumer(1,1)
public class FailingExaple {
@org.junit.Test
public void deadlockApplication() throws Exception
{
List<Future<Void>> futures = ProducerConsumerStarter.startIntegerProducerConsumer(10, 10);
for (Future<Void> future : futures)
{
System.out.println("Getting future");
future.get();
}
}
}
因为您的生产者和消费者保持 运行 直到发生显式中断,这在 deadlockApplication()
中不会发生。
您的代码应如下所示
for (Future<Void> future : futures)
{
if (future.isDone()) {
try {
System.out.println("Getting future");
future.get();
} catch (CancellationException ce) {
} catch (ExecutionException ee) {
}
} else {
System.out.println("The future is not done, cancelling it");
if (future.cancel(true)) {
System.out.println("task was cancelled");
} else {
//handle case when FutureTask#cancel(boolean mayInterruptIfRunning) wasn't cancelled
}
}
}
此循环将获取已完成任务的结果并取消未完成的任务。
@vanOekel说得对,最好有两个线程池,一个给消费者,一个给生产者。
像这样
class ProducerConsumerStarter
{
private static final ExecutorService CONSUMERS = Executors.newFixedThreadPool(8);
private static final ExecutorService PRODUCERS = Executors.newFixedThreadPool(8);
public static List<Future<Void>> startIntegerProducerConsumer(int producers, int consumers) {
...
}
}
和 startIntegerProducerConsumer(int, int)
相应地提交消费者和生产者。
但是在这种情况下,新的任务将排队等待之前提交的生产者和消费者完成(如果这些任务不被中断就不会发生)。
您还可以进一步优化生产者的代码。首先更改代码
class RandomIntegerProducer implements Runnable
{
private final BlockingQueue<? super Integer> queue;
private final Random random;
...
@Override
public void run()
{
queue.offer(random.nextInt());
}
}
然后开始使用 scheduleWithFixedDelay(producer, 1, 1, TimeUnit.SECONDS)
将生产者提交到 ScheduledExecutorService。此更改将有助于保持生产者 运行 而不会相互阻塞。但它也会稍微改变应用程序的语义。
您可以将 ScheduledExecutorService
(对于生产者)初始化为 class 变量。唯一的不便是您必须将 startIntegerProducerConsumer(int producers, int consumers)
方法的 return 类型更改为 List<Future<?>>
但实际上 scheduleWithFixedDelay(..)
中的 ScheduledFutures<?>
return 仍然会类型为 Future<Void>
。
如果可能的话,您可以对消费者做同样的事情,在使用新生成的数字期间,最大延迟等于 delay
(传递到 scheduleWithFixedDelay()
)对您来说很好。
希望我的回答对您有所帮助。
考虑这个示例代码:(我已经简化了 类 很多,所以它们更容易阅读)
制作人
class RandomIntegerProducer implements Callable<Void>
{
private final BlockingQueue<? super Integer> queue;
private final Random random;
/* Boilerplate constructor... */
@Override
public Void call()
{
while (!Thread.interrupted())
{
try {
TimeUnit.SECONDS.sleep(1);
queue.put(random.nextInt());
} catch (InterruptedException e)
{
Thread.currentThread().interrupt();
break;
}
}
return null;
}
}
这是一个简单、简洁的任务示例,每秒将一个随机数放入队列中,并且可以使用 Thread.interrupt()
取消。
消费者
class NumberConsumer implements Callable<Void>
{
private final BlockingQueue<? extends Number> queue;
private final Appendable target;
/* Boilerplate constructor... */
@Override
public Void call() throws IOException
{
while (!Thread.interrupted())
{
try {
target.append(queue.take().toString());
} catch (InterruptedException e)
{
Thread.currentThread().interrupt();
break;
}
}
return null;
}
}
消费者从队列中取出数字并打印到指定的Appendable
。可以通过Thread.interrupt()
.
起始码
class ProducerConsumerStarter
{
/* Notice this is a fixed size (e.g. bounded) executor service */
private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8);
public static List<Future<Void>> startIntegerProducerConsumer(int producers, int consumers)
{
List<Callable<Void>> callables = new ArrayList<>();
BlockingQueue<Integer> commonQueue = new ArrayBlockingQueue<>(16);
for (int i = 0; i < producers; i++)
{
callables.add(new RandomIntegerProducer(commonQueue, new Random()));
}
for (int i = 0; i < consumers; i++)
{
callables.add(new NumberConsumer(commonQueue, System.out));
}
// Submit them all (in order)
return callables.stream().map(SERVICE::submit).collect(Collectors.toList());
}
}
此实用程序方法将任务提交到有界执行程序服务(按顺序 - 首先是所有生产者,然后是所有消费者)
失败的客户端代码
public class FailingExaple {
@org.junit.Test
public void deadlockApplication() throws Exception
{
List<Future<Void>> futures = ProducerConsumerStarter.startIntegerProducerConsumer(10, 10);
for (Future<Void> future : futures)
{
System.out.println("Getting future");
future.get();
}
}
}
此示例代码通过将它和任何其他未来的起始代码调用者死锁而使该并发程序失败。
问题是:我怎样才能既防止我的应用程序在高负载下产生大量线程(我希望任务改为排队),又能防止执行程序仅被生产者污染而导致死锁?
即使这个样本在 100% 的时间里都明显失败,考虑一个并发程序,在不幸的情况下,只用生产者完全填充有界执行器——你会遇到同样的一般问题。
什么是死锁? Java Documentation
Deadlock describes a situation where two or more threads are blocked forever, waiting for each other.
因此,当第一个线程持有监视器 1 并尝试获取监视器 2,而第二个线程持有监视器 2 并尝试获取监视器 1 时,就会发生死锁。
您的代码中没有死锁,因为没有 two or more threads .. waiting for each other
。有生产者在队列中等待 space 而没有消费者,因为由于执行者的线程数而没有安排他们。
此外 "The client code that fails it" 将始终阻塞线程,即使 startIntegerProducerConsumer(1,1)
public class FailingExaple {
@org.junit.Test
public void deadlockApplication() throws Exception
{
List<Future<Void>> futures = ProducerConsumerStarter.startIntegerProducerConsumer(10, 10);
for (Future<Void> future : futures)
{
System.out.println("Getting future");
future.get();
}
}
}
因为您的生产者和消费者保持 运行 直到发生显式中断,这在 deadlockApplication()
中不会发生。
您的代码应如下所示
for (Future<Void> future : futures)
{
if (future.isDone()) {
try {
System.out.println("Getting future");
future.get();
} catch (CancellationException ce) {
} catch (ExecutionException ee) {
}
} else {
System.out.println("The future is not done, cancelling it");
if (future.cancel(true)) {
System.out.println("task was cancelled");
} else {
//handle case when FutureTask#cancel(boolean mayInterruptIfRunning) wasn't cancelled
}
}
}
此循环将获取已完成任务的结果并取消未完成的任务。
@vanOekel说得对,最好有两个线程池,一个给消费者,一个给生产者。
像这样
class ProducerConsumerStarter
{
private static final ExecutorService CONSUMERS = Executors.newFixedThreadPool(8);
private static final ExecutorService PRODUCERS = Executors.newFixedThreadPool(8);
public static List<Future<Void>> startIntegerProducerConsumer(int producers, int consumers) {
...
}
}
和 startIntegerProducerConsumer(int, int)
相应地提交消费者和生产者。
但是在这种情况下,新的任务将排队等待之前提交的生产者和消费者完成(如果这些任务不被中断就不会发生)。
您还可以进一步优化生产者的代码。首先更改代码
class RandomIntegerProducer implements Runnable
{
private final BlockingQueue<? super Integer> queue;
private final Random random;
...
@Override
public void run()
{
queue.offer(random.nextInt());
}
}
然后开始使用 scheduleWithFixedDelay(producer, 1, 1, TimeUnit.SECONDS)
将生产者提交到 ScheduledExecutorService。此更改将有助于保持生产者 运行 而不会相互阻塞。但它也会稍微改变应用程序的语义。
您可以将 ScheduledExecutorService
(对于生产者)初始化为 class 变量。唯一的不便是您必须将 startIntegerProducerConsumer(int producers, int consumers)
方法的 return 类型更改为 List<Future<?>>
但实际上 scheduleWithFixedDelay(..)
中的 ScheduledFutures<?>
return 仍然会类型为 Future<Void>
。
如果可能的话,您可以对消费者做同样的事情,在使用新生成的数字期间,最大延迟等于 delay
(传递到 scheduleWithFixedDelay()
)对您来说很好。
希望我的回答对您有所帮助。