如何在单个 Akka-http 流中读取多个网络套接字?
How can I read several web-sockets within a single Akka-http Flow?
我目前正在通过尝试建立多个 websockets 连接来练习 Akka-http。我用于创建 websockets 客户端流程(片段)的代码如下所示:
val webSocketFlow =
Http().webSocketClientFlow(WebSocketRequest(url), settings = customSettings)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.viaMat(decoder)(Keep.left)
.toMat(sink)(Keep.both)
.run()
如果我有一个 url,这目前效果很好。我很好奇如何扩展它以连接到多个 urls。因此,例如,如果我有一个不确定的 websockets 端点列表 List("ws://localhost:8080/foo", "ws://localhost:8080/bar", "ws://localhost:8080/baz")
.
我考虑过为每个 URL 添加一个新流,但是如果我有一长串 websockets endpoints/urls 怎么办?然后,这变得很麻烦,而且明显是手动的。我还考虑过将其包装到一个函数中,并在给定的可迭代对象中调用每个 URL。但这也让人觉得太过分了。
有没有一种方法可以将连接池全部归入一个流(或类似的流)?也欢迎进一步阅读。作为“可有可无”,是否还有一种方法可以标记传入的消息,以发出 url 它们来自的信号?
更新:为澄清起见,我只从 websockets(仅客户端)读取并且不发回任何消息。
您需要的是某种方式来合并各种 WebSocket 流,以便您可以处理传入的消息,就好像它们来自单个源一样。
因为你不需要发送任何数据,只需要接收实现就很简单了。
让我们开始创建一个函数,该函数将为给定的 uri 创建一个 WebSocket 源:
def webSocketSource(uri: Uri): Source[Message, Future[WebSocketUpgradeResponse]] = {
Source.empty.viaMat(Http().webSocketClientFlow(uri))(Keep.right)
}
由于您不关心发送数据,该函数通过提供一个空源立即关闭输出通道。结果是包含从 WebSocket 读取的消息的源。
此时我们可以使用这个函数为每个uri创建一个专用源:
val wsSources: List[Source[Message, NotUsed]] = uris.map { uri =>
webSocketSource(uri).mapMaterializedValue { respFuture =>
respFuture.map {
case _: ValidUpgrade => log.debug(s"Websocket upgrade for [${uri}] successful")
case err: InvalidUpgradeResponse => log.error(s"Websocket upgrade for [${uri}] failed: ${err.cause}")
}
NotUsed
}
}
这里我们需要以某种方式处理物化值,因为不可能(或至少不容易)将它们组合起来,因为我们不知道它们有多少。所以这里我们采用最简单的日志记录方法。
现在我们已经准备好源代码,我们可以继续合并它们了:
val mergedSource: Source[Message, NotUsed] = wsSources match {
case s1 :: s2 :: rest => Source.combine(s1, s2, rest: _*)(Merge(_))
case s1 :: Nil => s1
case Nil => Source.empty[Message]
}
这里的想法是,如果我们有 2 个或更多 uris,我们实际上会进行合并操作,否则如果我们只有一个,我们就直接使用它而不做任何修改。最后,我们还介绍了我们根本没有任何 uri 的情况,方法是提供一个空的 Source,它将简单地终止流而不会出错。
此时我们可以将此源与您已有的流和接收器结合起来,运行它:
val done: Future[Done] = mergedSource.via(decoder).toMat(sink)(Keep.right).run
这给了我们一个单一的未来,它将在所有连接完成时完成,或者一旦一个连接失败就会失败。
这应该有效(代码正在文本框中写入...):
def taggedWebsocketForUrl(url: String, tag: Int): Source[(Int, Message), Future[WebSocketUpgradeResponse]] =
outgoing.viaMat(Http().webSocketClientFlow(WebSocketRequest(url), settings = customSettings))(Keep.right).map(tag -> _)
val websocketMergedSource: Source[(Int, Message), Seq[Future[WebSocketUpgradeResponse]]] = {
// You could replace this with a mess of headOptions etc., but...
if (websocketUrls.isEmpty) Source.empty[(Int, Message)].mapMaterializedValue(_ => Seq(Future.failed(new NoSuchElementException("no websocket URLs"))))
else {
val first: Source[(Int, Message), List[Future[WebSocketUpgradeResponse]]] =
taggedWebsocketForUrl(websocketUrls.head, 0).mapMaterializedValue(List(_))
if (websocketUrls.tail.isEmpty) first
else {
websocketUrls.tail.foldLeft(first -> 1) {
(acc, url) =>
val newSource = acc._1.mergeMat(taggedWebsocketForUrl(url, acc._2)) {
(futs: List[Future[WebSocketUpgradeResponse]], fut: Future[WebSocketUpgradeResponse]) =>
fut :: futs // Will reverse at the end...
}
newSource -> (acc._2 + 1)
}._1.mapMaterializedValue(_.reverse)
}
}
}
有了这个,您将有许多升级响应(您可以 mapMaterializedValue(Future.sequence _)
将它们组合成一个 Future[Seq[WebsocketUpgradeResponse]]
,如果有任何失败就会失败)。来自列表中第 n
个 url 的消息将被标记为 n
。
请注意,websocketUrls
是一个 List
引导折叠:如果有 n
url,来自第一个 [=34] 的消息=] 将经历 n
-1 个合并阶段,而最后一个 url 将仅经历 1 个合并阶段,因此您希望放置您希望产生更多流量的 urls接近列表的末尾。
另一种更有效的方法是使用 Vector
或 Array
之类的 IndexedSeq
来分而治之以构建 merge
树。
使用 Akka Streams GraphDSL
也会给你很多控制权,但我倾向于将其用作最后的手段。
我目前正在通过尝试建立多个 websockets 连接来练习 Akka-http。我用于创建 websockets 客户端流程(片段)的代码如下所示:
val webSocketFlow =
Http().webSocketClientFlow(WebSocketRequest(url), settings = customSettings)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.viaMat(decoder)(Keep.left)
.toMat(sink)(Keep.both)
.run()
如果我有一个 url,这目前效果很好。我很好奇如何扩展它以连接到多个 urls。因此,例如,如果我有一个不确定的 websockets 端点列表 List("ws://localhost:8080/foo", "ws://localhost:8080/bar", "ws://localhost:8080/baz")
.
我考虑过为每个 URL 添加一个新流,但是如果我有一长串 websockets endpoints/urls 怎么办?然后,这变得很麻烦,而且明显是手动的。我还考虑过将其包装到一个函数中,并在给定的可迭代对象中调用每个 URL。但这也让人觉得太过分了。
有没有一种方法可以将连接池全部归入一个流(或类似的流)?也欢迎进一步阅读。作为“可有可无”,是否还有一种方法可以标记传入的消息,以发出 url 它们来自的信号?
更新:为澄清起见,我只从 websockets(仅客户端)读取并且不发回任何消息。
您需要的是某种方式来合并各种 WebSocket 流,以便您可以处理传入的消息,就好像它们来自单个源一样。
因为你不需要发送任何数据,只需要接收实现就很简单了。
让我们开始创建一个函数,该函数将为给定的 uri 创建一个 WebSocket 源:
def webSocketSource(uri: Uri): Source[Message, Future[WebSocketUpgradeResponse]] = {
Source.empty.viaMat(Http().webSocketClientFlow(uri))(Keep.right)
}
由于您不关心发送数据,该函数通过提供一个空源立即关闭输出通道。结果是包含从 WebSocket 读取的消息的源。
此时我们可以使用这个函数为每个uri创建一个专用源:
val wsSources: List[Source[Message, NotUsed]] = uris.map { uri =>
webSocketSource(uri).mapMaterializedValue { respFuture =>
respFuture.map {
case _: ValidUpgrade => log.debug(s"Websocket upgrade for [${uri}] successful")
case err: InvalidUpgradeResponse => log.error(s"Websocket upgrade for [${uri}] failed: ${err.cause}")
}
NotUsed
}
}
这里我们需要以某种方式处理物化值,因为不可能(或至少不容易)将它们组合起来,因为我们不知道它们有多少。所以这里我们采用最简单的日志记录方法。
现在我们已经准备好源代码,我们可以继续合并它们了:
val mergedSource: Source[Message, NotUsed] = wsSources match {
case s1 :: s2 :: rest => Source.combine(s1, s2, rest: _*)(Merge(_))
case s1 :: Nil => s1
case Nil => Source.empty[Message]
}
这里的想法是,如果我们有 2 个或更多 uris,我们实际上会进行合并操作,否则如果我们只有一个,我们就直接使用它而不做任何修改。最后,我们还介绍了我们根本没有任何 uri 的情况,方法是提供一个空的 Source,它将简单地终止流而不会出错。
此时我们可以将此源与您已有的流和接收器结合起来,运行它:
val done: Future[Done] = mergedSource.via(decoder).toMat(sink)(Keep.right).run
这给了我们一个单一的未来,它将在所有连接完成时完成,或者一旦一个连接失败就会失败。
这应该有效(代码正在文本框中写入...):
def taggedWebsocketForUrl(url: String, tag: Int): Source[(Int, Message), Future[WebSocketUpgradeResponse]] =
outgoing.viaMat(Http().webSocketClientFlow(WebSocketRequest(url), settings = customSettings))(Keep.right).map(tag -> _)
val websocketMergedSource: Source[(Int, Message), Seq[Future[WebSocketUpgradeResponse]]] = {
// You could replace this with a mess of headOptions etc., but...
if (websocketUrls.isEmpty) Source.empty[(Int, Message)].mapMaterializedValue(_ => Seq(Future.failed(new NoSuchElementException("no websocket URLs"))))
else {
val first: Source[(Int, Message), List[Future[WebSocketUpgradeResponse]]] =
taggedWebsocketForUrl(websocketUrls.head, 0).mapMaterializedValue(List(_))
if (websocketUrls.tail.isEmpty) first
else {
websocketUrls.tail.foldLeft(first -> 1) {
(acc, url) =>
val newSource = acc._1.mergeMat(taggedWebsocketForUrl(url, acc._2)) {
(futs: List[Future[WebSocketUpgradeResponse]], fut: Future[WebSocketUpgradeResponse]) =>
fut :: futs // Will reverse at the end...
}
newSource -> (acc._2 + 1)
}._1.mapMaterializedValue(_.reverse)
}
}
}
有了这个,您将有许多升级响应(您可以 mapMaterializedValue(Future.sequence _)
将它们组合成一个 Future[Seq[WebsocketUpgradeResponse]]
,如果有任何失败就会失败)。来自列表中第 n
个 url 的消息将被标记为 n
。
请注意,websocketUrls
是一个 List
引导折叠:如果有 n
url,来自第一个 [=34] 的消息=] 将经历 n
-1 个合并阶段,而最后一个 url 将仅经历 1 个合并阶段,因此您希望放置您希望产生更多流量的 urls接近列表的末尾。
另一种更有效的方法是使用 Vector
或 Array
之类的 IndexedSeq
来分而治之以构建 merge
树。
使用 Akka Streams GraphDSL
也会给你很多控制权,但我倾向于将其用作最后的手段。