Java 8 个用于输入数据的流实用程序

Java 8 Stream utilities for input data

想象一下,通过回调或 InputStream 获得某种传入数据,您需要 连续 将其转换为 Java 8 Stream.我们不知道传入数据流何时停止,但我们知道它可以停止。

到目前为止,我已经看到了解决此问题的两种方法,并且我对如何实现此问题的最佳实践很感兴趣。主要是因为我这一定是某人以前遇到过的事情。一定有比下面的想法更简单的方法。

1) 最简单的方法是将源视为 Supplier 并仅使用 Stream.generate 来提供数据:

Stream.generate(() -> blockCallToGetData());

然而,这有一个缺点,即流永远不会结束。因此,只要输入源停止发送,流就会继续调用该方法。除非我们自然地抛出运行时异常,但这会很快变得丑陋。

2) 第二个想法是使用 Iterator(转换为 Spliterator),其中 next 方法会阻塞,直到我们找到下一个元素。作为一个粗略的例子:

class BlockingIterator implements Iterator<Data> {

  @Override void boolean hasNext() {
    return true;
  }

  @Override Data next() {
    return blockCallToGetData();
  }

}

这样做的好处是我可以通过在hasNext方法中返回false来停止流。然而,在我们不控制传入数据速度的情况下(例如在回调中),我们需要为迭代器保留就绪元素的缓冲区。在有人对迭代器调用 next 之前,这个缓冲区可以无限大。

所以,我的问题是;将阻塞输入提供给流的最佳实践是什么?

该问题包含一个有问题的假设:将阻塞输入提供给流的良好做法。 Stream 不是反应式框架;虽然您可以用一根大撬棍将它楔入其中,但结果可能会在其他地方出现问题。 (EG 考虑了这些用例并得出结论,我们最好提供能够在一个问题上完成工作的东西,而不是在两个问题上提供一半的工作。)

如果您需要反应式框架,最好的做法是使用一个。 RxJava 很棒。

simple-react we solved this problem by using (simple-react) async Queues(JDK 队列数据结构的异步包装器)中,JDK 流可以从中读取。如果Queue关闭,Stream会自动断开。

快生产者/慢消费者问题可以通过队列来解决。如果(简单反应)异步队列由有界阻塞队列支持,一旦队列变满,它将自动减慢(阻塞)任何生产线程。

相比之下,LazyFutureStream 流实现在内部使用非阻塞队列,如果不存在数据,甚至会尝试将自己从队列中的数据消费者转变为生产者(因此它可以运行作为一个完全非阻塞的流)

使用 PushableStreamBuilder 的示例:

 PushableLazyFutureStream<Integer> pushable = new PushableStreamBuilder()
            .withBackPressureAfter(100)
            .withBackPressureOn(true)
            .pushableLazyFutureStream();

    // pushable.getInput().fromStream(input); would also be acceptable to add input data
    pushable.getInput().add(100); 
    pushable.getInput().close();

    List list = pushable.getStream().collect(Collectors.toList());

     //list is [100]