BlockingQueue 与 PipedOutputStream 和 PipedInputStream

BlockingQueue vs PipedOutputStream and PipedInputStream

我想知道使用 BlockingQueue 代替(PipedOutputStreamPipedInputStream

的优势
import java.io.*;
import java.util.concurrent.*;


public class PipedStreamVsBlocking {

  public static void main(String... args) {

    BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
    ExecutorService executor = Executors.newFixedThreadPool(4);
    Runnable producerTask = () -> {
      try {
        while (true) {
          int value = ThreadLocalRandom.current().nextInt(0, 1000);
          blockingQueue.put(value);
          System.out.println("BlockingQueue.Produced " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    };
    Runnable consumerTask = () -> {
      try {
        while (true) {
          int value = blockingQueue.take();
          System.out.println("BlockingQueue.Consume " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    };

    PipedOutputStream pipedSrc = new PipedOutputStream();
    PipedInputStream pipedSnk = new PipedInputStream();
    try {
      pipedSnk.connect(pipedSrc);
    } catch (IOException e) {
      e.printStackTrace();
    }

    Runnable runnablePut2 = () -> {
      try {
        ObjectOutputStream oos = new ObjectOutputStream(pipedSrc);
        while (true) {
          int value = ThreadLocalRandom.current().nextInt(0, 1000);
          oos.writeInt(value);
          oos.flush();
          System.out.println("PipedStream.Produced " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    };

    Runnable runnableGet2 = () -> {
      try {
        ObjectInputStream ois = new ObjectInputStream(pipedSnk);
        while (true) {
          int value = ois.readInt();
          System.out.println("PipedStream.Consume " + value);
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    };
    executor.execute(producerTask);
    executor.execute(consumerTask);
    executor.execute(runnablePut2);
    executor.execute(runnableGet2);
    executor.shutdown();
  }

}

这段代码的输出是:

BlockingQueue.Consume 298
BlockingQueue.Produced 298
PipedStream.Produced 510
PipedStream.Consume 510
BlockingQueue.Produced 536
BlockingQueue.Consume 536
PipedStream.Produced 751
PipedStream.Consume 751
PipedStream.Produced 619
BlockingQueue.Produced 584
BlockingQueue.Consume 584
PipedStream.Consume 619
BlockingQueue.Produced 327
PipedStream.Produced 72
BlockingQueue.Consume 327
PipedStream.Consume 72
BlockingQueue.Produced 823
BlockingQueue.Consume 823
PipedStream.Produced 544
PipedStream.Consume 544
BlockingQueue.Produced 352
BlockingQueue.Consume 352
PipedStream.Produced 134
PipedStream.Consume 134

我认为使用PipedStream(PipedOutputStreamPipedInputStream)有优势,我知道什么时候数据是直接produced/Processed。

可能是我错了, 使用 BlockingQueue 而不是 Pipe。

但是,您的 comments/recommendations 没有在文档中找到。 为此,我需要知道我错过了什么。

为什么我应该使用 BlockingQueue 而不是 Piped?

与任何 Java Collection 一样,BlockingQueue 存储对对象的 引用 ,因此从中检索对象的线程接收完全相同的运行时对象,生产线程放入其中。

相比之下,Serialization 将持久化形式存储到字节流中,它仅适用于 Serializable 个对象,并且会导致在接收端创建副本。在某些情况下,对象可能会在之后被规范对象替换,但整个过程比仅传输引用要昂贵得多。

在您的示例中,您传输 int 值,对象标识并不重要,但装箱、序列化、反序列化和拆箱 Integer 实例的开销更值得怀疑.

如果你没有使用序列化,而是直接将 int 值作为四个 byte 个数量传输,使用 PipedOutputStreamPipedInputStream 有一点,因为它是传输大量原始数据的好工具。它还具有通过关闭管道来标记数据结束的内在支持。

这些管道也将是软件的正确工具,这些软件应该与进程甚至计算机无关 运行 生产者或消费者,即当您希望能够使用相同的软件时管道实际上是进程之间甚至是网络连接。这也证明使用序列化是合理的(就像 JMX 连接一样)。

但是除非您真正传输的是在被撕裂时保留其含义的单个字节,否则存在内在限制,即只有一个生产者可以写入管道并且只有一个消费者可以读取数据。