如何正确地让多个线程访问共享缓冲区
How to properly have multiple threads access shared buffer
我有一个 Producer/Consumer 生产者生产域供消费者访问的情况。消费者发送 https 请求并从页面中获取链接并将它们提交回生产者。当生产者完成时,消费者没有并挂在最终域上。我一辈子都弄不明白为什么会这样。
I have simplified my question
主要:
public class Main {
public static void main(String[] args) throws InterruptedException {
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer(broker));
threadPool.execute(new Consumer(broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
经纪人:
public class Broker {
private BlockingQueue<String> QUEUE = new LinkedBlockingQueue<String>();
public Boolean continueProducing = Boolean.TRUE;
public void put(String data) throws InterruptedException
{
this.QUEUE.put(data);
}
public String get() throws InterruptedException
{
//return this.queue.poll(1, TimeUnit.SECONDS);
return this.QUEUE.take();
}
}
消费者:
public class Consumer implements Runnable{
private Broker broker;
public Consumer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {
try {
String data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + Thread.currentThread().getName() + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + Thread.currentThread().getName() + " finished its job; terminating.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
制作人:
public class Producer implements Runnable{
private Broker broker;
public Producer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {
try
{
for (int i = 0; i < 2; ++i) {
System.out.println("Producer produced: " + "https://example" + i + ".com");
Thread.sleep(100);
broker.put("https://example" + i + ".com");
}
//broker.put("https://example.com/2");
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}catch(Exception e)
{
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
更新的答案:
当我运行你的代码时,消费者卡在了线上data = broker.get()
。代理正在调用 BlockingQueue.take
方法。这是此方法的 Java 文档(重点是我的):
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
这意味着即使生产者不生产任何东西,消费者仍然会等待生产。
一种可能的解决方案是使用“poison pill”方法。假设您只有一位制作人,您的 Broker
class 可能如下所示:
public class Broker {
private static final String POISON_PILL = "__POISON_PILL__";
private BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public void put(String data) {
queue.add(data);
}
public void doneProducing() {
queue.add(POISON_PILL);
}
public String get() throws InterruptedException {
String result = queue.take();
if (result.equals(POISON_PILL)) {
queue.add(POISON_PILL);
return null;
} else {
return result;
}
}
}
上一个代码的答案:
如果你能缩小这个问题的范围,让它只包含最少量的代码来获得死锁,那就太好了。目前的情况是,您发布了很多不相关的代码,并且有一些相关的代码您没有发布。
此外,您当前的代码存在很多问题。您的 toLinkedHashSet
方法无法编译。在您的 add 方法中,您正在调用 BlockingQueue.put
方法,即使您的 BlockingQueue
永远不会达到其限制。您声称 contains
需要 Ο(1) 时间,但您的代码有 Ο(n) 时间。您似乎还在 addAll
和 contains
方法中进行了很多不必要的复制。
这里没有足够的信息让我知道问题出在哪里,但是可能导致您的问题的一件事是您的 get
方法。如果消费者线程被中断,那么您的 get
方法将导致它自身不中断(这可能不会导致死锁,但可能看起来像死锁)。在 Java 中,忽略异常是很少见的。如果您对 take
方法的调用抛出 InterruptedException
,这是有原因的:另一个线程希望当前线程停止。您的 get
方法应该抛出 InterruptedException
。例如:
public String get() throws InterruptedException {
return unprocessed.take();
}
如果您确实需要 get
方法不抛出 InterruptedException
,您可以抛出其他一些包含 InterruptedException
的链式异常。如果真的适合 return ""
中断,你可以这样做:
public String get() {
try {
return unprocessed.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "";
}
}
通过中断当前线程,您可以确保至少当前线程被标记为已中断,以便在线下的某些东西可以处理它。但是如果可能的话,抛出 InterruptedException
可能是最合适的。
我仍然不明白您为什么要为 LinkedBlockingQueue
创建自己的包装器,而不是只使用 LinkedBlockingQueue
本身。似乎您在 LinkedBlockingQueue
之上添加的所有内容都只会减慢速度。
我有一个 Producer/Consumer 生产者生产域供消费者访问的情况。消费者发送 https 请求并从页面中获取链接并将它们提交回生产者。当生产者完成时,消费者没有并挂在最终域上。我一辈子都弄不明白为什么会这样。
I have simplified my question
主要:
public class Main {
public static void main(String[] args) throws InterruptedException {
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer(broker));
threadPool.execute(new Consumer(broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
经纪人:
public class Broker {
private BlockingQueue<String> QUEUE = new LinkedBlockingQueue<String>();
public Boolean continueProducing = Boolean.TRUE;
public void put(String data) throws InterruptedException
{
this.QUEUE.put(data);
}
public String get() throws InterruptedException
{
//return this.queue.poll(1, TimeUnit.SECONDS);
return this.QUEUE.take();
}
}
消费者:
public class Consumer implements Runnable{
private Broker broker;
public Consumer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {
try {
String data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + Thread.currentThread().getName() + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + Thread.currentThread().getName() + " finished its job; terminating.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
制作人:
public class Producer implements Runnable{
private Broker broker;
public Producer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {
try
{
for (int i = 0; i < 2; ++i) {
System.out.println("Producer produced: " + "https://example" + i + ".com");
Thread.sleep(100);
broker.put("https://example" + i + ".com");
}
//broker.put("https://example.com/2");
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}catch(Exception e)
{
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
更新的答案:
当我运行你的代码时,消费者卡在了线上data = broker.get()
。代理正在调用 BlockingQueue.take
方法。这是此方法的 Java 文档(重点是我的):
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
这意味着即使生产者不生产任何东西,消费者仍然会等待生产。
一种可能的解决方案是使用“poison pill”方法。假设您只有一位制作人,您的 Broker
class 可能如下所示:
public class Broker {
private static final String POISON_PILL = "__POISON_PILL__";
private BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public void put(String data) {
queue.add(data);
}
public void doneProducing() {
queue.add(POISON_PILL);
}
public String get() throws InterruptedException {
String result = queue.take();
if (result.equals(POISON_PILL)) {
queue.add(POISON_PILL);
return null;
} else {
return result;
}
}
}
上一个代码的答案:
如果你能缩小这个问题的范围,让它只包含最少量的代码来获得死锁,那就太好了。目前的情况是,您发布了很多不相关的代码,并且有一些相关的代码您没有发布。
此外,您当前的代码存在很多问题。您的 toLinkedHashSet
方法无法编译。在您的 add 方法中,您正在调用 BlockingQueue.put
方法,即使您的 BlockingQueue
永远不会达到其限制。您声称 contains
需要 Ο(1) 时间,但您的代码有 Ο(n) 时间。您似乎还在 addAll
和 contains
方法中进行了很多不必要的复制。
这里没有足够的信息让我知道问题出在哪里,但是可能导致您的问题的一件事是您的 get
方法。如果消费者线程被中断,那么您的 get
方法将导致它自身不中断(这可能不会导致死锁,但可能看起来像死锁)。在 Java 中,忽略异常是很少见的。如果您对 take
方法的调用抛出 InterruptedException
,这是有原因的:另一个线程希望当前线程停止。您的 get
方法应该抛出 InterruptedException
。例如:
public String get() throws InterruptedException {
return unprocessed.take();
}
如果您确实需要 get
方法不抛出 InterruptedException
,您可以抛出其他一些包含 InterruptedException
的链式异常。如果真的适合 return ""
中断,你可以这样做:
public String get() {
try {
return unprocessed.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "";
}
}
通过中断当前线程,您可以确保至少当前线程被标记为已中断,以便在线下的某些东西可以处理它。但是如果可能的话,抛出 InterruptedException
可能是最合适的。
我仍然不明白您为什么要为 LinkedBlockingQueue
创建自己的包装器,而不是只使用 LinkedBlockingQueue
本身。似乎您在 LinkedBlockingQueue
之上添加的所有内容都只会减慢速度。