Akka Streams 中的 Flow#join 是什么

What is Flow#join in Akka Streams

我是 Scala 中 Akka Streams 的学习者。当我在 IncomingConnection 阅读时,我发现了 Flow#join。然后,我在Flow#join.

的评论里找到了下图
+------+        +-------+
|      | ~Out~> |       |
| this |        | other |
|      | <~In~  |       |
+------+        +-------+

但是,我想知道它的结构是什么。我认为 "join" 形成了一个循环。

所以我希望你解释一下 "join" 的结构,并告诉我使用 Flow#join

的简单示例代码

文档状态:

Join this Flow to another Flow, by cross connecting the inputs and outputs, creating a RunnableGraph

这是来自 akka.http.scaladsl 的一个很好的例子,可以帮助解释为什么这是有用的:

  /**
   * Represents one accepted incoming HTTP connection.
   */
  final case class IncomingConnection(
    localAddress: InetSocketAddress,
    remoteAddress: InetSocketAddress,
    flow: Flow[HttpResponse, HttpRequest, NotUsed]) {

    /**
     * Handles the connection with the given flow, which is materialized exactly once
     * and the respective materialization result returned.
     */
    def handleWith[Mat](handler: Flow[HttpRequest, HttpResponse, Mat])(implicit fm: Materializer): Mat =
      flow.joinMat(handler)(Keep.right).run()

您可能知道 Akka http 流的 handler 总是从 HttpRequest 流向 HttpResponse,但您可以看到 IncomingConnection.flow 流从 HttpResponseHttpRequest。换句话说,用户有责任根据请求创建响应,而 Akka Http 有责任发送该响应并产生另一个请求。当它涉及另一个流时,这确实形成了一个闭环,因此 join 方法创建了一个 RunnableGraph.

要了解如何处理连接,您应该多了解一些 BidiFlowBidiFlow#join 的结果是另一个流,因为 BidiFlow 有两个输入和两个输出。这是带有示例的 link to an excellent explanation