Akka:Websocket 处理程序失败,无法订阅关闭发布者
Akka: Websocket handler failed with Cannot subscribe to shut-down Publisher
我正在使用 Akka Streams 和 Akka Http 来实现 websocket 流。流使用队列作为源,TextMessage
的来源如下:
val (queue, source) = Source.queue[Message](0, OverflowStrategy.backpressure).recoverWithRetries(-1, {
case exception: Exception =>
println(exception)
Source(Nil)
}).preMaterialize()
def send[T](message: T)(implicit jsonFormat: JsonFormat[T]): Unit = queue.offer(TextMessage.Strict(message.toJson.toString()))
流程构建如下
val flow: Flow[Message, Message, NotUsed] = Flow.fromSinkAndSource(Sink.ignore, source).via(reportErrorsFlow)
def reportErrorsFlow[T]: Flow[T, T, Any] =
Flow[T]
.watchTermination()((_, f) => f.onComplete {
case Failure(cause) =>
println(s"WS stream failed with $cause")
case ex => println("Complete", ex) // ignore regular completion
})
然后提供给路由
val websocketRoute: Route =
path(pathName) {
handleWebSocketMessages(flow)
}
然后服务器启动
val routes = cors() {
concat(health, websocketRoute, ...other routes)
}
val bindingFuture = Http().newServerAt("localhost", 8080).bind(routes)
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate())
我 运行 遇到的问题是网络套接字没有正常关闭,当消息正在传输时关闭网络套接字会导致流程崩溃,我在尝试再次连接时收到以下错误
Websocket handler failed with Cannot subscribe to shut-down Publisher (akka.stream.impl.ActorPublisher$NormalShutdownException: Cannot subscribe to shut-down Publisher)
您收到该错误的原因是因为您尝试在您的 websocket 处理程序中重用预先实现的 Source
。
当您预先实现 Source
时,您会得到具体化的值(在您的例子中是队列)和与该具体化相关联的 一次性使用 Source
值。
您需要做的是,每次获得 websocket 连接时,您都应该从头开始设置所有流。在您的情况下,这意味着每次您的路线触发时都要对 Source.queue
进行预先处理。所以不要将 flow
val 传递给 handleWebSocketMessages
指令,调用 returns 一个 Flow
的函数,说 flow()
将设置所有内容。
我正在使用 Akka Streams 和 Akka Http 来实现 websocket 流。流使用队列作为源,TextMessage
的来源如下:
val (queue, source) = Source.queue[Message](0, OverflowStrategy.backpressure).recoverWithRetries(-1, {
case exception: Exception =>
println(exception)
Source(Nil)
}).preMaterialize()
def send[T](message: T)(implicit jsonFormat: JsonFormat[T]): Unit = queue.offer(TextMessage.Strict(message.toJson.toString()))
流程构建如下
val flow: Flow[Message, Message, NotUsed] = Flow.fromSinkAndSource(Sink.ignore, source).via(reportErrorsFlow)
def reportErrorsFlow[T]: Flow[T, T, Any] =
Flow[T]
.watchTermination()((_, f) => f.onComplete {
case Failure(cause) =>
println(s"WS stream failed with $cause")
case ex => println("Complete", ex) // ignore regular completion
})
然后提供给路由
val websocketRoute: Route =
path(pathName) {
handleWebSocketMessages(flow)
}
然后服务器启动
val routes = cors() {
concat(health, websocketRoute, ...other routes)
}
val bindingFuture = Http().newServerAt("localhost", 8080).bind(routes)
println("Server online at http://localhost:8080/")
println("Press RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate())
我 运行 遇到的问题是网络套接字没有正常关闭,当消息正在传输时关闭网络套接字会导致流程崩溃,我在尝试再次连接时收到以下错误
Websocket handler failed with Cannot subscribe to shut-down Publisher (akka.stream.impl.ActorPublisher$NormalShutdownException: Cannot subscribe to shut-down Publisher)
您收到该错误的原因是因为您尝试在您的 websocket 处理程序中重用预先实现的 Source
。
当您预先实现 Source
时,您会得到具体化的值(在您的例子中是队列)和与该具体化相关联的 一次性使用 Source
值。
您需要做的是,每次获得 websocket 连接时,您都应该从头开始设置所有流。在您的情况下,这意味着每次您的路线触发时都要对 Source.queue
进行预先处理。所以不要将 flow
val 传递给 handleWebSocketMessages
指令,调用 returns 一个 Flow
的函数,说 flow()
将设置所有内容。