Akka:Websocket 处理程序失败,无法订阅关闭发布者

Akka: Websocket handler failed with Cannot subscribe to shut-down Publisher

我正在使用 Akka Streams 和 Akka Http 来实现 websocket 流。流使用队列作为源,TextMessage 的来源如下:

  val (queue, source) = Source.queue[Message](0, OverflowStrategy.backpressure).recoverWithRetries(-1, {
    case exception: Exception =>
      println(exception)
      Source(Nil)
  }).preMaterialize()

  def send[T](message: T)(implicit jsonFormat: JsonFormat[T]): Unit = queue.offer(TextMessage.Strict(message.toJson.toString()))

流程构建如下

 val flow: Flow[Message, Message, NotUsed] = Flow.fromSinkAndSource(Sink.ignore, source).via(reportErrorsFlow)

  def reportErrorsFlow[T]: Flow[T, T, Any] =
    Flow[T]
      .watchTermination()((_, f) => f.onComplete {
        case Failure(cause) =>
          println(s"WS stream failed with $cause")
        case ex => println("Complete", ex) // ignore regular completion
      })

然后提供给路由

  val websocketRoute: Route =
    path(pathName) {
      handleWebSocketMessages(flow)
    }

然后服务器启动

  val routes = cors() {
    concat(health, websocketRoute, ...other routes)
  }
  val bindingFuture = Http().newServerAt("localhost", 8080).bind(routes)
  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  StdIn.readLine() // let it run until user presses return
  bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate())

我 运行 遇到的问题是网络套接字没有正常关闭,当消息正在传输时关闭网络套接字会导致流程崩溃,我在尝试再次连接时收到以下错误

Websocket handler failed with Cannot subscribe to shut-down Publisher (akka.stream.impl.ActorPublisher$NormalShutdownException: Cannot subscribe to shut-down Publisher)

您收到该错误的原因是因为您尝试在您的 websocket 处理程序中重用预先实现的 Source

当您预先实现 Source 时,您会得到具体化的值(在您的例子中是队列)和与该具体化相关联的 一次性使用 Source值。

您需要做的是,每次获得 websocket 连接时,您都应该从头开始设置所有流。在您的情况下,这意味着每次您的路线触发时都要对 Source.queue 进行预先处理。所以不要将 flow val 传递给 handleWebSocketMessages 指令,调用 returns 一个 Flow 的函数,说 flow() 将设置所有内容。