如何始终保持传入的 websocket 连接打开?
How to keep my incoming websocket connection open all the time?
我使用此示例代码客户端连接到我的 websocket 服务,但目前它只是连接然后关闭。
如何保持此连接打开且永不关闭?
建立连接后,我希望它保持打开状态,直到我关闭应用程序。
package docs.http.scaladsl
import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
// send this as a message over the WebSocket
val outgoing = Source.single(TextMessage("hello world!"))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
更新
我的代码与上面相同,只是我将源代码更新为如下所示:
val source1 = Source.single(TextMessage("""{"action":"auth","params":"APIKEY_123"}"""))
val source2 = Source.single(TextMessage("""{"action":"subscribe","params":"topic123"}"""))
val sources: Source[Message, NotUsed] =
Source.combine(source1, source2, Source.maybe)(Concat(_))
所以我可以看到我的 source1 和 source2 被发送到 websocket,但是 websocket 没有开始传输它应该的值,它只是挂起。
不确定我做错了什么...
The Akka docs call out your situation:
The Akka HTTP WebSocket API does not support half-closed connections which means that if either stream completes the entire connection is closed (after a “Closing Handshake” has been exchanged or a timeout of 3 seconds has passed).
在您的例子中,outgoing
(作为 Source.single
)在发出 TextMessage
后立即完成。 webSocketFlow
收到完成消息,然后断开连接。
解决方案是在 outgoing
完成时延迟,甚至可能永远延迟(或至少直到应用程序被终止)。
在您不想通过 websocket 发送消息的情况下,两个标准源可能对延迟完成有用。
Source.maybe
实现为 Promise
,您可以使用可选的终止消息来完成它。除非并且直到承诺完成,否则它不会完成。
Source.never
永远不会完成。您可以通过不完成 Source.maybe
来实现此目的,但这比那样的开销要小。
那么它在代码中会是什么样子?
val outgoing =
Source.single(TextMessage("hello world!"))
.concat(Source.never)
对于 Source.maybe
,您需要 .concatMat
以便 Promise
可以完成;这确实意味着您将获得类似 val (completionPromise, upgradeResponse, closed)
的整体物化值:
val outgoing =
Source.single(TextMessage("hello world!"))
.concatMat(Source.maybe[TextMessage])(Keep.right)
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(websocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
.run()
在您想通过套接字发送任意多条消息的情况下,Source.actorRef
或 Source.queue
很方便:将消息发送到物化 actor ref 以通过 websocket 连接发送它们(发送一个特殊的消息来完成源)或 offer
消息到队列然后 complete
它。
val outgoing =
Source.actorRef[TextMessage](
completionMatcher = {
case Done =>
CompletionStrategy.draining // send the messages already sent before completing
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropNew
)
val ((sendToSocketRef, upgradeResponse), closed) =
outgoing
.viaMat(websocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
.run()
sendToSocketRef ! TextMessage("hello world!")
我使用此示例代码客户端连接到我的 websocket 服务,但目前它只是连接然后关闭。
如何保持此连接打开且永不关闭?
建立连接后,我希望它保持打开状态,直到我关闭应用程序。
package docs.http.scaladsl
import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
// send this as a message over the WebSocket
val outgoing = Source.single(TextMessage("hello world!"))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
更新 我的代码与上面相同,只是我将源代码更新为如下所示:
val source1 = Source.single(TextMessage("""{"action":"auth","params":"APIKEY_123"}"""))
val source2 = Source.single(TextMessage("""{"action":"subscribe","params":"topic123"}"""))
val sources: Source[Message, NotUsed] =
Source.combine(source1, source2, Source.maybe)(Concat(_))
所以我可以看到我的 source1 和 source2 被发送到 websocket,但是 websocket 没有开始传输它应该的值,它只是挂起。
不确定我做错了什么...
The Akka docs call out your situation:
The Akka HTTP WebSocket API does not support half-closed connections which means that if either stream completes the entire connection is closed (after a “Closing Handshake” has been exchanged or a timeout of 3 seconds has passed).
在您的例子中,outgoing
(作为 Source.single
)在发出 TextMessage
后立即完成。 webSocketFlow
收到完成消息,然后断开连接。
解决方案是在 outgoing
完成时延迟,甚至可能永远延迟(或至少直到应用程序被终止)。
在您不想通过 websocket 发送消息的情况下,两个标准源可能对延迟完成有用。
Source.maybe
实现为Promise
,您可以使用可选的终止消息来完成它。除非并且直到承诺完成,否则它不会完成。Source.never
永远不会完成。您可以通过不完成Source.maybe
来实现此目的,但这比那样的开销要小。
那么它在代码中会是什么样子?
val outgoing =
Source.single(TextMessage("hello world!"))
.concat(Source.never)
对于 Source.maybe
,您需要 .concatMat
以便 Promise
可以完成;这确实意味着您将获得类似 val (completionPromise, upgradeResponse, closed)
的整体物化值:
val outgoing =
Source.single(TextMessage("hello world!"))
.concatMat(Source.maybe[TextMessage])(Keep.right)
val ((completionPromise, upgradeResponse), closed) =
outgoing
.viaMat(websocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
.run()
在您想通过套接字发送任意多条消息的情况下,Source.actorRef
或 Source.queue
很方便:将消息发送到物化 actor ref 以通过 websocket 连接发送它们(发送一个特殊的消息来完成源)或 offer
消息到队列然后 complete
它。
val outgoing =
Source.actorRef[TextMessage](
completionMatcher = {
case Done =>
CompletionStrategy.draining // send the messages already sent before completing
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropNew
)
val ((sendToSocketRef, upgradeResponse), closed) =
outgoing
.viaMat(websocketFlow)(Keep.both)
.toMat(incoming)(Keep.both)
.run()
sendToSocketRef ! TextMessage("hello world!")