Scala & Play Websockets:存储交换的消息

Scala & Play Websockets: Storing messages exchanged

我开始玩 scala,并在 scala 中找到了这个特定的 web 套接字聊天室样板。

他们使用 MessageHub.source()BroadcastHub.sink() 作为将消息发送到所有连接的客户端的源和接收器。

该示例按原样可以很好地交换消息。

private val (chatSink, chatSource) = {
  // Don't log MergeHub$ProducerFailed as error if the client disconnects.
  // recoverWithRetries -1 is essentially "recoverWith"
  val source = MergeHub.source[WSMessage]
    .log("source")
    .recoverWithRetries(-1, { case _: Exception ⇒ Source.empty })

  val sink = BroadcastHub.sink[WSMessage]
  source.toMat(sink)(Keep.both).run()
}

private val userFlow: Flow[WSMessage, WSMessage, _] = {
 Flow.fromSinkAndSource(chatSink, chatSource)
}

def chat(): WebSocket = {
  WebSocket.acceptOrResult[WSMessage, WSMessage] {
    case rh if sameOriginCheck(rh) =>
      Future.successful(userFlow).map { flow =>
        Right(flow)
      }.recover {
        case e: Exception =>
          val msg = "Cannot create websocket"
          logger.error(msg, e)
          val result = InternalServerError(msg)
          Left(result)
      }

    case rejected =>
      logger.error(s"Request ${rejected} failed same origin check")
      Future.successful {
      Left(Forbidden("forbidden"))
      }
  }
}

我想将聊天室中交换的消息存储在数据库中。

我尝试将 map 和 fold 函数添加到 source 和 sink 以获取发送的消息,但我无法做到。

我尝试在 MergeHub 和 BroadcastHub 之间添加一个流阶段,如下所示

val flow = Flow[WSMessage].map(element => println(s"Message: $element"))
source.via(flow).toMat(sink)(Keep.both).run()

但它会抛出一个编译错误,无法引用具有此类签名的 toMat。

有人可以帮助我或告诉我如何获取发送的消息并将它们存储在数据库中。

Link 完整模板:

https://github.com/playframework/play-scala-chatroom-example

让我们看看你的流程:

val flow = Flow[WSMessage].map(element => println(s"Message: $element"))

它接受 WSMessage 类型的元素,returns 什么都不接受 (Unit)。这里又是正确的类型:

val flow: Flow[Unit] = Flow[WSMessage].map(element => println(s"Message: $element"))

这显然不会像接收器预期的那样工作 WSMessage 而不是 Unit

解决上述问题的方法如下:

val flow = Flow[WSMessage].map { element =>
  println(s"Message: $element")
  element
 }

不是为了在数据库中持久化消息,您很可能希望使用异步阶段,大致如下:

val flow = Flow[WSMessage].mapAsync(parallelism) { element =>
  println(s"Message: $element")
  // assuming DB.write() returns a Future[Unit]
  DB.write(element).map(_ => element)
 }