Java 8 个用于输入数据的流实用程序
Java 8 Stream utilities for input data
想象一下,通过回调或 InputStream
获得某种传入数据,您需要 连续 将其转换为 Java 8 Stream
.我们不知道传入数据流何时停止,但我们知道它可以停止。
到目前为止,我已经看到了解决此问题的两种方法,并且我对如何实现此问题的最佳实践很感兴趣。主要是因为我这一定是某人以前遇到过的事情。一定有比下面的想法更简单的方法。
1) 最简单的方法是将源视为 Supplier
并仅使用 Stream.generate
来提供数据:
Stream.generate(() -> blockCallToGetData());
然而,这有一个缺点,即流永远不会结束。因此,只要输入源停止发送,流就会继续调用该方法。除非我们自然地抛出运行时异常,但这会很快变得丑陋。
2) 第二个想法是使用 Iterator
(转换为 Spliterator
),其中 next
方法会阻塞,直到我们找到下一个元素。作为一个粗略的例子:
class BlockingIterator implements Iterator<Data> {
@Override void boolean hasNext() {
return true;
}
@Override Data next() {
return blockCallToGetData();
}
}
这样做的好处是我可以通过在hasNext
方法中返回false
来停止流。然而,在我们不控制传入数据速度的情况下(例如在回调中),我们需要为迭代器保留就绪元素的缓冲区。在有人对迭代器调用 next
之前,这个缓冲区可以无限大。
所以,我的问题是;将阻塞输入提供给流的最佳实践是什么?
该问题包含一个有问题的假设:是将阻塞输入提供给流的良好做法。 Stream 不是反应式框架;虽然您可以用一根大撬棍将它楔入其中,但结果可能会在其他地方出现问题。 (EG 考虑了这些用例并得出结论,我们最好提供能够在一个问题上完成工作的东西,而不是在两个问题上提供一半的工作。)
如果您需要反应式框架,最好的做法是使用一个。 RxJava 很棒。
在 simple-react we solved this problem by using (simple-react) async Queues(JDK 队列数据结构的异步包装器)中,JDK 流可以从中读取。如果Queue关闭,Stream会自动断开。
快生产者/慢消费者问题可以通过队列来解决。如果(简单反应)异步队列由有界阻塞队列支持,一旦队列变满,它将自动减慢(阻塞)任何生产线程。
相比之下,LazyFutureStream 流实现在内部使用非阻塞队列,如果不存在数据,甚至会尝试将自己从队列中的数据消费者转变为生产者(因此它可以运行作为一个完全非阻塞的流)
使用 PushableStreamBuilder 的示例:
PushableLazyFutureStream<Integer> pushable = new PushableStreamBuilder()
.withBackPressureAfter(100)
.withBackPressureOn(true)
.pushableLazyFutureStream();
// pushable.getInput().fromStream(input); would also be acceptable to add input data
pushable.getInput().add(100);
pushable.getInput().close();
List list = pushable.getStream().collect(Collectors.toList());
//list is [100]
想象一下,通过回调或 InputStream
获得某种传入数据,您需要 连续 将其转换为 Java 8 Stream
.我们不知道传入数据流何时停止,但我们知道它可以停止。
到目前为止,我已经看到了解决此问题的两种方法,并且我对如何实现此问题的最佳实践很感兴趣。主要是因为我这一定是某人以前遇到过的事情。一定有比下面的想法更简单的方法。
1) 最简单的方法是将源视为 Supplier
并仅使用 Stream.generate
来提供数据:
Stream.generate(() -> blockCallToGetData());
然而,这有一个缺点,即流永远不会结束。因此,只要输入源停止发送,流就会继续调用该方法。除非我们自然地抛出运行时异常,但这会很快变得丑陋。
2) 第二个想法是使用 Iterator
(转换为 Spliterator
),其中 next
方法会阻塞,直到我们找到下一个元素。作为一个粗略的例子:
class BlockingIterator implements Iterator<Data> {
@Override void boolean hasNext() {
return true;
}
@Override Data next() {
return blockCallToGetData();
}
}
这样做的好处是我可以通过在hasNext
方法中返回false
来停止流。然而,在我们不控制传入数据速度的情况下(例如在回调中),我们需要为迭代器保留就绪元素的缓冲区。在有人对迭代器调用 next
之前,这个缓冲区可以无限大。
所以,我的问题是;将阻塞输入提供给流的最佳实践是什么?
该问题包含一个有问题的假设:是将阻塞输入提供给流的良好做法。 Stream 不是反应式框架;虽然您可以用一根大撬棍将它楔入其中,但结果可能会在其他地方出现问题。 (EG 考虑了这些用例并得出结论,我们最好提供能够在一个问题上完成工作的东西,而不是在两个问题上提供一半的工作。)
如果您需要反应式框架,最好的做法是使用一个。 RxJava 很棒。
在 simple-react we solved this problem by using (simple-react) async Queues(JDK 队列数据结构的异步包装器)中,JDK 流可以从中读取。如果Queue关闭,Stream会自动断开。
快生产者/慢消费者问题可以通过队列来解决。如果(简单反应)异步队列由有界阻塞队列支持,一旦队列变满,它将自动减慢(阻塞)任何生产线程。
相比之下,LazyFutureStream 流实现在内部使用非阻塞队列,如果不存在数据,甚至会尝试将自己从队列中的数据消费者转变为生产者(因此它可以运行作为一个完全非阻塞的流)
使用 PushableStreamBuilder 的示例:
PushableLazyFutureStream<Integer> pushable = new PushableStreamBuilder()
.withBackPressureAfter(100)
.withBackPressureOn(true)
.pushableLazyFutureStream();
// pushable.getInput().fromStream(input); would also be acceptable to add input data
pushable.getInput().add(100);
pushable.getInput().close();
List list = pushable.getStream().collect(Collectors.toList());
//list is [100]