Java 8 流和 RxJava 可观察量之间的区别

Difference between Java 8 streams and RxJava observables

Java 8 个流与 RxJava observables 相似吗?

Java 8 流定义:

Classes in the new java.util.stream package provide a Stream API to support functional-style operations on streams of elements.

Java 8 Stream 和 RxJava 看起来很相似。它们具有相似的运算符(filter、map、flatMap...),但不是为相同的用途而构建的。

您可以使用 RxJava 执行异步任务。

使用 Java 8 流,您将遍历 collection 的项目。

你可以在 RxJava 中做几乎相同的事情(遍历 collection 的项目)但是,由于 RxJava 专注于并发任务,...,它使用同步,锁存器,...所以使用 RxJava 的相同任务可能比使用 Java 8 stream.

RxJava 可以与 CompletableFuture 进行比较,但它可以计算出不止一个值。

存在一些技术和概念上的差异,例如,Java 8 个流是一次性使用的、基于拉取的同步值序列,而 RxJava 可观察对象是可重新观察的、自适应推送的基于拉取的、可能异步的值序列。 RxJava 针对 Java 6+,也适用于 Android。

Java 8 流是基于拉取的。您迭代一个 Java 8 流来消耗每个项目。并且源源不断。

RXJava Observable 默认是基于推送的。您订阅了一个 Observable,您将在下一个项目到达 (onNext)、流完成 (onCompleted) 或发生错误 (onError) 时收到通知。 因为使用 Observable 您会收到 onNextonCompletedonError 事件,所以您可以执行一些强大的功能,例如将不同的 Observable 组合成一个新的(zipmergeconcat)。您可以做的其他事情是缓存,节流,...... 它在不同的语言中使用或多或少相同的 API(RxJava,C# 中的 RX,RxJS,...)

默认情况下 RxJava 是单线程的。除非您开始使用调度程序,否则一切都将发生在同一个线程上。

RxJava也与reactive streams initiative and considers it self as a simple implementation of the reactive streams API (e.g. compared to the Akka streams implementation). The main difference is, that the reactive streams are designed to be able to handle back pressure, but if you have a look at the reactive streams page, you will get the idea. They describe their goals pretty well and the streams are also closely related to the reactive manifesto密切相关。

Java 8 个流几乎是无限集合的实现,非常类似于 Scala Stream or the Clojure lazy seq

简答

所有 sequence/stream 处理库都提供非常相似的 API 管道构建。不同之处在于 API 用于处理多线程和管道组合。

长答案

RxJava 与 Stream 有很大的不同。在所有 JDK 事物中,最接近 rx.Observable 的可能是 java.util.stream.Collector Stream + CompletableFuture 组合(出现在处理额外 monad 层的成本,即必须处理 Stream<CompletableFuture<T>>CompletableFuture<Stream<T>> 之间的转换。

Observable 和 Stream 之间存在显着差异:

  • Streams 是拉式的,Observables 是推式的。这听起来可能过于抽象,但它具有非常具体的重要后果。
  • Stream只能使用一次,Observable可以订阅多次
  • Stream#parallel() 将序列拆分为多个分区,Observable#subscribeOn()Observable#observeOn() 则不会;用 Observable 模拟 Stream#parallel() 行为是很棘手的,它曾经有 .parallel() 方法,但是这个方法引起了很多混乱,以至于 .parallel() 支持被移动到单独的存储库:ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava. More details are in another answer.
  • Stream#parallel() 不允许指定要使用的线程池,这与大多数接受可选调度程序的 RxJava 方法不同。由于 JVM 中的 all 流实例使用相同的 fork-join 池,因此添加 .parallel() 可能会意外影响程序另一个模块中的行为。
  • 流缺少与时间相关的操作,如 Observable#interval()Observable#window() 等;这主要是因为 Streams 是基于拉取的,而上游无法控制 何时 向下游发出下一个元素。
  • 与 RxJava 相比,Streams 提供了一组受限的操作。例如。流缺少截止操作(takeWhile()takeUntil());使用 Stream#anyMatch() 的解决方法是有限的:它是终端操作,因此每个流不能多次使用它
  • 从JDK 8开始,没有Stream#zip()操作,这有时很有用。
  • Streams很难自己构建,Observable可以通过多种方式构建 编辑:正如评论中指出的那样,有多种方法构造流。但是,由于没有非终端短路,你不能 e。 G。轻松生成文件中的行流(JDK 提供开箱即用的 Files#lines()BufferedReader#lines(),其他类似的场景可以通过从 Iterator 构造 Stream 来管理)。
  • Observable 提供资源管理工具(Observable#using());你可以用它包装 IO 流或互斥量,并确保用户不会忘记释放资源——它会在订阅终止时自动处理; Stream 有 onClose(Runnable) 方法,但你必须手动或通过 try-with-resources 调用它。例如你必须记住 Files#lines() 必须 包含在 try-with-resources 块中。
  • Observables 自始至终都是同步的(我实际上并没有检查 Streams 是否也是如此)。这使您不必考虑基本操作是否是线程安全的(答案总是 'yes',除非有错误),但与并发相关的开销将存在,无论您的代码是否需要它。

综述

RxJava 与 Streams 有很大的不同。真正的 RxJava 替代品是 ReactiveStreams 的其他实现,例如。 G。 Akka 的相关部分。

更新

Stream#parallel 使用非默认 fork-join 池的技巧,请参阅 Custom thread pool in Java 8 parallel stream

更新

以上所有内容都是基于使用 RxJava 的经验 1.x。现在RxJava 2.x is here,这个答案可能已经过时了。

Java 8 Streams 可以有效地处理非常大的集合,同时利用多核架构。相反,RxJava 默认是单线程的(没有 Schedulers)。所以 RxJava 不会利用多核机器,除非你自己编写逻辑。

现有的答案是全面和正确的,但缺乏适合初学者的明确示例。请允许我在 "push/pull-based" 和 "re-observable" 等术语后面添加一些具体的术语。 注意:我讨厌这个词Observable(看在上帝的份上,它是一个流),所以将简单地引用 J8 与 RX 流。

考虑一个整数列表,

digits = [1,2,3,4,5]

J8 Stream 是一个用于修改集合的实用程序。例如偶数位可以提取为,

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

这基本上是 Python 的 map, filter, reduce,是对 Java 的一个非常好的(并且早就应该)添加。但是如果没有提前收集数字怎么办——如果数字在应用程序 运行 时流入怎么办——我们可以实时过滤偶数吗?

想象一个单独的线程进程在随机时间输出整数,而应用程序是 运行(--- 表示时间)

digits = 12345---6------7--8--9-10--------11--12

在 RX 中,even可以对每个新数字做出反应并实时应用过滤器

even = -2-4-----6---------8----10------------12

无需存储输入和输出列表。如果您想要 一个输出列表,没问题,它也可以流式传输。事实上,everything is a stream.

evens_stored = even.collect()  

这就是为什么像 "stateless" 和 "functional" 这样的术语与 RX 更相关的原因