Akka Reactive Streams 处理器的目的

Akka Reactive Streams purpose of the Processor

我正在尝试理解 akka 中的 Reactive Streams。我已阅读此博客 http://bryangilbert.com/blog/2015/02/04/akka-reactive-streams/,我想我已基本了解其工作原理。然而,我不明白的是 Processer 在这个概念中的目的。它是做什么用的? Subscriber 请求 N-Objects 和 Publisher 使用 onNext() 发送它们还不够吗?

假设您有一个非常简单的流程,只有一个 Source(发布者)和一个 Sink(订阅者)。您将这两个连接起来,接收器订阅发布者并开始请求数据,数据流向接收器。在此示例中,您真正需要的只是发布者和订阅者。但在这个例子中,数据从源到接收器的过程中没有任何变化。它没有以任何方式进行转换,因此不是很有趣也不是很有用。

处理器结合了发布者和订阅者接口,因此可以充当这两个角色。处理器旨在嵌入到源和接收器之间的处理流中并转换数据。如果我将一个放入之前的 source/sink 示例中,数据流和订阅者将发生变化。现在,接收器订阅了该处理器,而处理器又订阅了源。接收器从处理器请求元素,处理器将该需求向上游传播到源。当有元素可以满足需求时,它还负责将元素推向下游。这就是为什么它必须实现这两个接口,因为它必须扮演这两个角色。

对于您添加的每个处理步骤,例如 mapfilter,您都在添加另一个可以处理背压的位置。这些步骤不是数据的起始点(源)或目的地(汇)。他们要做的就是接收数据并对其进行处理或改变数据流并向下游发送元素以满足需求。因为他们需要能够 link 进入任何链,所以他们需要发布和订阅功能,这就是 Processor 存在的原因。