Akka websocket - 如何通过服务器关闭连接?

Akka websocket - how to close connection by server?

所以这是我的 websocket 服务器实现。

val route = get {
  pathEndOrSingleSlash {
    handleWebSocketMessages(websocketFlow)
  }
}

def websocketFlow: Flow[Message, Message, Any] =
  Flow[Message]
    .collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
    .via(chatActorFlow(UUID.randomUUID()))
    .map(event => TextMessage.Strict(protocol.serialize(event)))


def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {

  val sink = Flow[Protocol.Message]
    .map(msg => Protocol.SignedMessage(connectionId, msg))
    .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

  val source = Source
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }

  Flow.fromSinkAndSource(sink, source)
}

我想知道 chatRef 发送 ConnectionClosed 类型的消息后是否有任何方法可以关闭连接?

下面的解决方案允许通过终止 Source.actorRef 阶段具体化的 Actor 来断开服务器端的连接。这只需向它发送 PoisonPill 即可。

现在,我仍然不清楚你想如何在连接时识别 "banned" 客户端,所以这个例子 - 故意 - 非常简单:服务器在连接后断开任何连接连接的最大客户端数量。如果您想随时使用任何其他策略踢出客户端,您仍然可以应用相同的逻辑并将 PoisonPill 发送到他们自己的源参与者。

object ChatApp extends App {

  implicit val system = ActorSystem("chat")
  implicit val executor: ExecutionContextExecutor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val route = get {
    pathEndOrSingleSlash {
      handleWebSocketMessages(websocketFlow)
    }
  }

  val maximumClients = 1

  class ChatRef extends Actor {
    override def receive: Receive = withClients(Map.empty[UUID, ActorRef])

    def withClients(clients: Map[UUID, ActorRef]): Receive = {
      case SignedMessage(uuid, msg) => clients.collect{
        case (id, ar) if id == uuid => ar ! msg
      }
      case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
      case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
      case CloseConnection(uuid) => context.become(withClients(clients - uuid))
    }
  }

  object Protocol {
    case class SignedMessage(uuid: UUID, msg: String)
    case class OpenConnection(actor: ActorRef, uuid: UUID)
    case class CloseConnection(uuid: UUID)
  }

  val chatRef = system.actorOf(Props[ChatRef])

  def websocketFlow: Flow[Message, Message, Any] =
    Flow[Message]
      .mapAsync(1) {
        case TextMessage.Strict(s) => Future.successful(s)
        case TextMessage.Streamed(s) => s.runFold("")(_ + _)
        case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
      }.via(chatActorFlow(UUID.randomUUID()))
      .map(TextMessage(_))

  def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {

    val sink = Flow[String]
      .map(msg => Protocol.SignedMessage(connectionId, msg))
      .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

    val source = Source.actorRef(16, OverflowStrategy.fail)
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }

    Flow.fromSinkAndSource(sink, source)
  }

  Http().bindAndHandle(route, "0.0.0.0", 8080)
    .map(_ => println(s"Started server..."))

}