多个消费者线程整体消费队列FIFO

Multiple Consumer Threads Consume Queue FIFO Overall

由于我正在尝试学习 JAVA 编程的多线程部分,在处理一个生产者 - 多个消费者编码时遇到以下问题。

我想要实现的是:多个消费者线程按照项目放入队列的顺序从队列中取出项目。也就是说,让消费者线程整体保持先进先出的方式。

final BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

Runnable rb = new Runnable() {
    public void run() {
        try {
            System.out.println(deque.takeLast());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
};

deque.putFirst("a");
deque.putFirst("b");
deque.putFirst("c");
deque.putFirst("d");

ExecutorService pool = Executors.newFixedThreadPool(4);
pool.submit(rb);
pool.submit(rb);
pool.submit(rb);
pool.submit(rb);

我在找什么: 一种 b C d

它实际输出的是什么: b C 一种 d

或随机排序

有什么简单的方法可以解决这个问题吗?谢谢!

在你的情况下,问题是

System.out.println(deque.takeLast());

实际上是两条指令,它们一起不是原子的。想象一下这样的场景:

  1. 线程 1 从队列中取出字符串。
  2. 线程 2 从队列中取出字符串。
  3. 线程 2 打印值。
  4. 线程 1 打印值。

所以这完全取决于操作系统如何管理线程执行。

在您的情况下,一种可能的解决方案是将 synchronized 关键字添加到 run 方法:

Runnable rb = new Runnable() {
    public synchronized void run() {
         try {
              String s = deque.takeLast();
              System.out.println(s);
         } catch (InterruptedException e) {
              e.printStackTrace();
         }
    }
};

这将同步您在此处创建的匿名实例 class。由于您将相同的可运行对象传递给 ExecutorService - 它应该可以工作。 或者你可以在你的 queue 对象上同步,因为你的可运行对象可以访问队列对象,当你将它传递给 ExecutorService 时,将在许多线程中执行:

Runnable rb = new Runnable() {
    public void run() {
        synchronized (deque) {
             try {
                 String s = deque.takeLast();
                 System.out.println(s);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
        }
    }
};

还要记住关闭线程池,因为现在您的应用程序将永远不会退出。