在源到达其边界后,到达服务器实现流处理的消息会发生什么?
What happens to messages that come to a server implements stream processing after the source reached its bound?
我正在学习 akka 流,但显然它与任何流框架都相关 :)
引用 akka 文档:
Reactive Streams is just to define a common mechanism of how to move
data across an asynchronous boundary without losses, buffering or
resource exhaustion
现在,据我了解,如果直到流之前,让我们以一个 http 服务器为例,请求将会到来,当接收者完成请求时,那么即将到来的新请求将是收集在一个缓冲区中,该缓冲区将保存等待的请求,然后存在一个问题,即该缓冲区的大小未知,并且在某些时候,如果服务器过载,我们可能会丢失等待的请求。
然后流处理开始发挥作用,他们将这个缓冲区限制为可控...所以我们可以预定义我们想要排队的消息数量(在我的例子中是请求),我们可以处理每个消息一次。
我的问题是,如果我们实现服务器中的源最多可以有 3 条消息,那么如果第 4 个 ID 出现会发生什么?
我的意思是,当另一台服务器呼叫我们时,我们已经处理了 3 个请求...他的请求会怎样?
您所描述的实际上并不是 Reactive Streams 实现解决的主要问题。
请求数量方面的背压已通过常规网络工具解决。例如,在 Java 中,您可以将网络库(例如 Netty)的线程池配置为某种并行级别,该库将负责接受尽可能多的请求。或者,如果您使用同步套接字 API,它甚至更简单 - 您可以推迟对服务器套接字的调用 accept()
,直到为所有当前连接的客户端提供服务。在任何一种情况下,两边都没有 "buffer",直到服务器接受连接,客户端才会被阻塞(在阻塞 APIs 的系统调用中,或者在事件循环中对于异步 APIs).
Reactive Streams 实现解决的是如何处理更高级别数据管道内的背压。反应流实现(例如 akka-streams)提供了一种构建数据管道的方法,当数据的消费者变慢时,生产者也会自动变慢,这适用于任何类型的底层传输,无论是 HTTP、WebSockets、原始 TCP 连接还是进程内消息传递。
例如,考虑一个简单的 WebSocket 连接,其中客户端发送连续的信息流(例如来自某个传感器的数据),服务器将此数据写入某个数据库。现在假设服务器端的数据库由于某种原因(网络问题、磁盘过载等)变慢了。服务器现在跟不上客户端发送的数据,也就是在新的数据到达之前不能及时保存到数据库中。如果您在整个管道中使用反应流实现,服务器将自动向客户端发出信号,表明它无法处理更多数据,并且客户端将自动调整其生成速率,以免服务器过载。
当然,这可以在没有任何 Reactive Streams 实现的情况下完成,例如通过手动控制确认。然而,与许多其他库一样,Reactive Streams 实现为您解决了这个问题。它们还提供了一种简单的方法来定义此类管道,并且通常它们具有用于各种外部系统(如数据库)的接口。特别是,此类库可能会在最低级别实施背压,直至 TCP 连接,这可能很难手动完成。
至于 Reactive Streams 本身,它只是对可以由库实现的 API 的描述,它定义了通用术语和行为,并允许此类库可以互换或轻松交互,例如您可以使用规范中的接口将 akka-streams 管道连接到 Monix 管道,组合后的管道将无缝工作并支持 Reactive Streams 的所有背压功能。
我正在学习 akka 流,但显然它与任何流框架都相关 :)
引用 akka 文档:
Reactive Streams is just to define a common mechanism of how to move data across an asynchronous boundary without losses, buffering or resource exhaustion
现在,据我了解,如果直到流之前,让我们以一个 http 服务器为例,请求将会到来,当接收者完成请求时,那么即将到来的新请求将是收集在一个缓冲区中,该缓冲区将保存等待的请求,然后存在一个问题,即该缓冲区的大小未知,并且在某些时候,如果服务器过载,我们可能会丢失等待的请求。
然后流处理开始发挥作用,他们将这个缓冲区限制为可控...所以我们可以预定义我们想要排队的消息数量(在我的例子中是请求),我们可以处理每个消息一次。
我的问题是,如果我们实现服务器中的源最多可以有 3 条消息,那么如果第 4 个 ID 出现会发生什么?
我的意思是,当另一台服务器呼叫我们时,我们已经处理了 3 个请求...他的请求会怎样?
您所描述的实际上并不是 Reactive Streams 实现解决的主要问题。
请求数量方面的背压已通过常规网络工具解决。例如,在 Java 中,您可以将网络库(例如 Netty)的线程池配置为某种并行级别,该库将负责接受尽可能多的请求。或者,如果您使用同步套接字 API,它甚至更简单 - 您可以推迟对服务器套接字的调用 accept()
,直到为所有当前连接的客户端提供服务。在任何一种情况下,两边都没有 "buffer",直到服务器接受连接,客户端才会被阻塞(在阻塞 APIs 的系统调用中,或者在事件循环中对于异步 APIs).
Reactive Streams 实现解决的是如何处理更高级别数据管道内的背压。反应流实现(例如 akka-streams)提供了一种构建数据管道的方法,当数据的消费者变慢时,生产者也会自动变慢,这适用于任何类型的底层传输,无论是 HTTP、WebSockets、原始 TCP 连接还是进程内消息传递。
例如,考虑一个简单的 WebSocket 连接,其中客户端发送连续的信息流(例如来自某个传感器的数据),服务器将此数据写入某个数据库。现在假设服务器端的数据库由于某种原因(网络问题、磁盘过载等)变慢了。服务器现在跟不上客户端发送的数据,也就是在新的数据到达之前不能及时保存到数据库中。如果您在整个管道中使用反应流实现,服务器将自动向客户端发出信号,表明它无法处理更多数据,并且客户端将自动调整其生成速率,以免服务器过载。
当然,这可以在没有任何 Reactive Streams 实现的情况下完成,例如通过手动控制确认。然而,与许多其他库一样,Reactive Streams 实现为您解决了这个问题。它们还提供了一种简单的方法来定义此类管道,并且通常它们具有用于各种外部系统(如数据库)的接口。特别是,此类库可能会在最低级别实施背压,直至 TCP 连接,这可能很难手动完成。
至于 Reactive Streams 本身,它只是对可以由库实现的 API 的描述,它定义了通用术语和行为,并允许此类库可以互换或轻松交互,例如您可以使用规范中的接口将 akka-streams 管道连接到 Monix 管道,组合后的管道将无缝工作并支持 Reactive Streams 的所有背压功能。