如何在单个 Akka-http 流中读取多个网络套接字?

How can I read several web-sockets within a single Akka-http Flow?

我目前正在通过尝试建立多个 websockets 连接来练习 Akka-http。我用于创建 websockets 客户端流程(片段)的代码如下所示:

val webSocketFlow =
  Http().webSocketClientFlow(WebSocketRequest(url), settings = customSettings)

val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right)
    .viaMat(decoder)(Keep.left)
    .toMat(sink)(Keep.both)
    .run()

如果我有一个 url,这目前效果很好。我很好奇如何扩展它以连接到多个 urls。因此,例如,如果我有一个不确定的 websockets 端点列表 List("ws://localhost:8080/foo", "ws://localhost:8080/bar", "ws://localhost:8080/baz").

我考虑过为每个 URL 添加一个新流,但是如果我有一长串 websockets endpoints/urls 怎么办?然后,这变得很麻烦,而且明显是手动的。我还考虑过将其包装到一个函数中,并在给定的可迭代对象中调用每个 URL。但这也让人觉得太过分了。

有没有一种方法可以将连接池全部归入一个流(或类似的流)?也欢迎进一步阅读。作为“可有可无”,是否还有一种方法可以标记传入的消息,以发出 url 它们来自的信号?

更新:为澄清起见,我只从 websockets(仅客户端)读取并且不发回任何消息。

您需要的是某种方式来合并各种 WebSocket 流,以便您可以处理传入的消息,就好像它们来自单个源一样。

因为你不需要发送任何数据,只需要接收实现就很简单了。

让我们开始创建一个函数,该函数将为给定的 uri 创建一个 WebSocket 源:

def webSocketSource(uri: Uri): Source[Message, Future[WebSocketUpgradeResponse]] = {
  Source.empty.viaMat(Http().webSocketClientFlow(uri))(Keep.right)
}

由于您不关心发送数据,该函数通过提供一个空源立即关闭输出通道。结果是包含从 WebSocket 读取的消息的源。

此时我们可以使用这个函数为每个uri创建一个专用源:

val wsSources: List[Source[Message, NotUsed]] = uris.map { uri =>
  webSocketSource(uri).mapMaterializedValue { respFuture =>
    respFuture.map {
      case _: ValidUpgrade => log.debug(s"Websocket upgrade for [${uri}] successful")
      case err: InvalidUpgradeResponse => log.error(s"Websocket upgrade for [${uri}] failed: ${err.cause}")
    }

    NotUsed
  }
}

这里我们需要以某种方式处理物化值,因为不可能(或至少不容易)将它们组合起来,因为我们不知道它们有多少。所以这里我们采用最简单的日志记录方法。

现在我们已经准备好源代码,我们可以继续合并它们了:

val mergedSource: Source[Message, NotUsed] = wsSources match {
  case s1 :: s2 :: rest => Source.combine(s1, s2, rest: _*)(Merge(_))
  case s1 :: Nil => s1
  case Nil => Source.empty[Message]
}

这里的想法是,如果我们有 2 个或更多 uris,我们实际上会进行合并操作,否则如果我们只有一个,我们就直接使用它而不做任何修改。最后,我们还介绍了我们根本没有任何 uri 的情况,方法是提供一个空的 Source,它将简单地终止流而不会出错。

此时我们可以将此源与您已有的流和接收器结合起来,运行它:

val done: Future[Done] = mergedSource.via(decoder).toMat(sink)(Keep.right).run

这给了我们一个单一的未来,它将在所有连接完成时完成,或者一旦一个连接失败就会失败。

这应该有效(代码正在文本框中写入...):

def taggedWebsocketForUrl(url: String, tag: Int): Source[(Int, Message), Future[WebSocketUpgradeResponse]] =
  outgoing.viaMat(Http().webSocketClientFlow(WebSocketRequest(url), settings = customSettings))(Keep.right).map(tag -> _)

val websocketMergedSource: Source[(Int, Message), Seq[Future[WebSocketUpgradeResponse]]] = {
  // You could replace this with a mess of headOptions etc., but...
  if (websocketUrls.isEmpty) Source.empty[(Int, Message)].mapMaterializedValue(_ => Seq(Future.failed(new NoSuchElementException("no websocket URLs"))))
  else {
    val first: Source[(Int, Message), List[Future[WebSocketUpgradeResponse]]] =
      taggedWebsocketForUrl(websocketUrls.head, 0).mapMaterializedValue(List(_))
    if (websocketUrls.tail.isEmpty) first
    else {
      websocketUrls.tail.foldLeft(first -> 1) {
        (acc, url) =>
          val newSource = acc._1.mergeMat(taggedWebsocketForUrl(url, acc._2)) {
            (futs: List[Future[WebSocketUpgradeResponse]], fut: Future[WebSocketUpgradeResponse]) =>
              fut :: futs // Will reverse at the end...
          }
          newSource -> (acc._2 + 1)
      }._1.mapMaterializedValue(_.reverse)
    }
  }
}

有了这个,您将有许多升级响应(您可以 mapMaterializedValue(Future.sequence _) 将它们组合成一个 Future[Seq[WebsocketUpgradeResponse]],如果有任何失败就会失败)。来自列表中第 n 个 url 的消息将被标记为 n

请注意,websocketUrls 是一个 List 引导折叠:如果有 n url,来自第一个 [=34] 的消息=] 将经历 n-1 个合并阶段,而最后一个 url 将仅经历 1 个合并阶段,因此您希望放置您希望产生更多流量的 urls接近列表的末尾。

另一种更有效的方法是使用 VectorArray 之类的 IndexedSeq 来分而治之以构建 merge 树。

使用 Akka Streams GraphDSL 也会给你很多控制权,但我倾向于将其用作最后的手段。