Akka websocket - 如何通过服务器关闭连接?
Akka websocket - how to close connection by server?
所以这是我的 websocket 服务器实现。
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
.via(chatActorFlow(UUID.randomUUID()))
.map(event => TextMessage.Strict(protocol.serialize(event)))
def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {
val sink = Flow[Protocol.Message]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
我想知道 chatRef
发送 ConnectionClosed
类型的消息后是否有任何方法可以关闭连接?
下面的解决方案允许通过终止 Source.actorRef
阶段具体化的 Actor 来断开服务器端的连接。这只需向它发送 PoisonPill
即可。
现在,我仍然不清楚你想如何在连接时识别 "banned" 客户端,所以这个例子 - 故意 - 非常简单:服务器在连接后断开任何连接连接的最大客户端数量。如果您想随时使用任何其他策略踢出客户端,您仍然可以应用相同的逻辑并将 PoisonPill
发送到他们自己的源参与者。
object ChatApp extends App {
implicit val system = ActorSystem("chat")
implicit val executor: ExecutionContextExecutor = system.dispatcher
implicit val materializer = ActorMaterializer()
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
val maximumClients = 1
class ChatRef extends Actor {
override def receive: Receive = withClients(Map.empty[UUID, ActorRef])
def withClients(clients: Map[UUID, ActorRef]): Receive = {
case SignedMessage(uuid, msg) => clients.collect{
case (id, ar) if id == uuid => ar ! msg
}
case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
case CloseConnection(uuid) => context.become(withClients(clients - uuid))
}
}
object Protocol {
case class SignedMessage(uuid: UUID, msg: String)
case class OpenConnection(actor: ActorRef, uuid: UUID)
case class CloseConnection(uuid: UUID)
}
val chatRef = system.actorOf(Props[ChatRef])
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(s) => Future.successful(s)
case TextMessage.Streamed(s) => s.runFold("")(_ + _)
case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
}.via(chatActorFlow(UUID.randomUUID()))
.map(TextMessage(_))
def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {
val sink = Flow[String]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source.actorRef(16, OverflowStrategy.fail)
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
.map(_ => println(s"Started server..."))
}
所以这是我的 websocket 服务器实现。
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
.via(chatActorFlow(UUID.randomUUID()))
.map(event => TextMessage.Strict(protocol.serialize(event)))
def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {
val sink = Flow[Protocol.Message]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
我想知道 chatRef
发送 ConnectionClosed
类型的消息后是否有任何方法可以关闭连接?
下面的解决方案允许通过终止 Source.actorRef
阶段具体化的 Actor 来断开服务器端的连接。这只需向它发送 PoisonPill
即可。
现在,我仍然不清楚你想如何在连接时识别 "banned" 客户端,所以这个例子 - 故意 - 非常简单:服务器在连接后断开任何连接连接的最大客户端数量。如果您想随时使用任何其他策略踢出客户端,您仍然可以应用相同的逻辑并将 PoisonPill
发送到他们自己的源参与者。
object ChatApp extends App {
implicit val system = ActorSystem("chat")
implicit val executor: ExecutionContextExecutor = system.dispatcher
implicit val materializer = ActorMaterializer()
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
val maximumClients = 1
class ChatRef extends Actor {
override def receive: Receive = withClients(Map.empty[UUID, ActorRef])
def withClients(clients: Map[UUID, ActorRef]): Receive = {
case SignedMessage(uuid, msg) => clients.collect{
case (id, ar) if id == uuid => ar ! msg
}
case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
case CloseConnection(uuid) => context.become(withClients(clients - uuid))
}
}
object Protocol {
case class SignedMessage(uuid: UUID, msg: String)
case class OpenConnection(actor: ActorRef, uuid: UUID)
case class CloseConnection(uuid: UUID)
}
val chatRef = system.actorOf(Props[ChatRef])
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(s) => Future.successful(s)
case TextMessage.Streamed(s) => s.runFold("")(_ + _)
case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
}.via(chatActorFlow(UUID.randomUUID()))
.map(TextMessage(_))
def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {
val sink = Flow[String]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source.actorRef(16, OverflowStrategy.fail)
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
.map(_ => println(s"Started server..."))
}