从服务器关闭 akka-http websocket 连接
Close akka-http websocket connection from server
在我的场景中,客户端发送 "goodbye" websocket 消息,我需要关闭之前在服务器端建立的连接。
来自 akka-http docs:
Closing connections is possible by cancelling the incoming connection Flow from your server logic (e.g. by connecting its downstream to a Sink.cancelled and its upstream to a Source.empty). It is also possible to shut down the server's socket by cancelling the IncomingConnection source connections.
但考虑到 Sink
和 Source
在协商新连接时设置一次,我不清楚如何做到这一点:
(get & path("ws")) {
optionalHeaderValueByType[UpgradeToWebsocket]() {
case Some(upgrade) ⇒
val connectionId = UUID()
complete(upgrade.handleMessagesWithSinkSource(sink, source))
case None ⇒
reject(ExpectedWebsocketRequestRejection)
}
}
提示:此答案基于 akka-stream-experimental
版本 2.0-M2
。 API 在其他版本中可能略有不同。
关闭连接的一种简单方法是使用 PushStage
:
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
在客户端或服务器端接收到的每个元素(通常每个通过 Flow
的元素)都会经过这样一个 Stage
组件。在 Akka 中,完整的抽象称为 GraphStage
,更多信息可以在 official documentation.
中找到
使用 PushStage
我们可以观察具体的传入元素的值,然后相应地转换上下文。在上面的示例中,一旦收到 goodbye
消息,我们就完成了上下文,否则我们只是通过 push
方法转发值。
现在,我们可以通过 transform
方法将 closeClient
组件连接到任意流:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
上面的流接收一个ByteString
和returns一个ByteString
,这意味着它可以通过join
方法连接到connection
。在流内部,我们首先将字节转换为字符串,然后再将它们发送到 closeClient
。如果 PushStage
没有完成流,该元素将在流中转发,在那里它被丢弃并被来自标准输入的一些输入替换,然后通过线路发回。如果流完成,将删除阶段组件之后的所有进一步流处理步骤 - 流现已关闭。
这可以在当前 (2.4.14) 版本的 akka-stream 中通过以下方式完成
package com.trackabus.misc
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
// terminates the flow based on a predicate for a message of type T
// if forwardTerminatingMessage is set the message is passed along the flow
// before termination
// if terminate is true the stage is failed, if it is false the stage is completed
class TerminateFlowStage[T](
pred: T => Boolean,
forwardTerminatingMessage: Boolean = false,
terminate: Boolean = true)
extends GraphStage[FlowShape[T, T]]
{
val in = Inlet[T]("TerminateFlowStage.in")
val out = Outlet[T]("TerminateFlowStage.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandlers(in, out, new InHandler with OutHandler {
override def onPull(): Unit = { pull(in) }
override def onPush(): Unit = {
val chunk = grab(in)
if (pred(chunk)) {
if (forwardTerminatingMessage)
push(out, chunk)
if (terminate)
failStage(new RuntimeException("Flow terminated by TerminateFlowStage"))
else
completeStage()
}
else
push(out, chunk)
}
})
}
}
使用它定义你的阶段
val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe])
然后将其作为流程的一部分包含在内
.via(termOnKillMe)
另一种方法是使用 Source.queue 中的队列管理连接。队列可用于向客户端发送消息以及关闭连接。
def socketFlow: Flow[Message, Message, NotUsed] = {
val (queue, source) = Source.queue[Message](5, OverflowStrategy.fail).preMaterialize()
// receive client message
val sink = Sink.foreach[Message] {
case TextMessage.Strict("goodbye") =>
queue.complete() // this closes the connection
case TextMessage.Strict(text) =>
// send message to client by using offer
queue.offer(TextMessage(s"you sent $text"))
}
Flow.fromSinkAndSource(sink, source)
}
// you then produce the upgrade response like this
val response = upgrade.handleMessages(socketFlow)
使用 WebSocket 队列的一个好处是,只要您可以访问它,就可以随时使用它发送消息,而不必等待传入消息回复。
在我的场景中,客户端发送 "goodbye" websocket 消息,我需要关闭之前在服务器端建立的连接。
来自 akka-http docs:
Closing connections is possible by cancelling the incoming connection Flow from your server logic (e.g. by connecting its downstream to a Sink.cancelled and its upstream to a Source.empty). It is also possible to shut down the server's socket by cancelling the IncomingConnection source connections.
但考虑到 Sink
和 Source
在协商新连接时设置一次,我不清楚如何做到这一点:
(get & path("ws")) {
optionalHeaderValueByType[UpgradeToWebsocket]() {
case Some(upgrade) ⇒
val connectionId = UUID()
complete(upgrade.handleMessagesWithSinkSource(sink, source))
case None ⇒
reject(ExpectedWebsocketRequestRejection)
}
}
提示:此答案基于 akka-stream-experimental
版本 2.0-M2
。 API 在其他版本中可能略有不同。
关闭连接的一种简单方法是使用 PushStage
:
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
在客户端或服务器端接收到的每个元素(通常每个通过 Flow
的元素)都会经过这样一个 Stage
组件。在 Akka 中,完整的抽象称为 GraphStage
,更多信息可以在 official documentation.
使用 PushStage
我们可以观察具体的传入元素的值,然后相应地转换上下文。在上面的示例中,一旦收到 goodbye
消息,我们就完成了上下文,否则我们只是通过 push
方法转发值。
现在,我们可以通过 transform
方法将 closeClient
组件连接到任意流:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
上面的流接收一个ByteString
和returns一个ByteString
,这意味着它可以通过join
方法连接到connection
。在流内部,我们首先将字节转换为字符串,然后再将它们发送到 closeClient
。如果 PushStage
没有完成流,该元素将在流中转发,在那里它被丢弃并被来自标准输入的一些输入替换,然后通过线路发回。如果流完成,将删除阶段组件之后的所有进一步流处理步骤 - 流现已关闭。
这可以在当前 (2.4.14) 版本的 akka-stream 中通过以下方式完成
package com.trackabus.misc
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
// terminates the flow based on a predicate for a message of type T
// if forwardTerminatingMessage is set the message is passed along the flow
// before termination
// if terminate is true the stage is failed, if it is false the stage is completed
class TerminateFlowStage[T](
pred: T => Boolean,
forwardTerminatingMessage: Boolean = false,
terminate: Boolean = true)
extends GraphStage[FlowShape[T, T]]
{
val in = Inlet[T]("TerminateFlowStage.in")
val out = Outlet[T]("TerminateFlowStage.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandlers(in, out, new InHandler with OutHandler {
override def onPull(): Unit = { pull(in) }
override def onPush(): Unit = {
val chunk = grab(in)
if (pred(chunk)) {
if (forwardTerminatingMessage)
push(out, chunk)
if (terminate)
failStage(new RuntimeException("Flow terminated by TerminateFlowStage"))
else
completeStage()
}
else
push(out, chunk)
}
})
}
}
使用它定义你的阶段
val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe])
然后将其作为流程的一部分包含在内
.via(termOnKillMe)
另一种方法是使用 Source.queue 中的队列管理连接。队列可用于向客户端发送消息以及关闭连接。
def socketFlow: Flow[Message, Message, NotUsed] = {
val (queue, source) = Source.queue[Message](5, OverflowStrategy.fail).preMaterialize()
// receive client message
val sink = Sink.foreach[Message] {
case TextMessage.Strict("goodbye") =>
queue.complete() // this closes the connection
case TextMessage.Strict(text) =>
// send message to client by using offer
queue.offer(TextMessage(s"you sent $text"))
}
Flow.fromSinkAndSource(sink, source)
}
// you then produce the upgrade response like this
val response = upgrade.handleMessages(socketFlow)
使用 WebSocket 队列的一个好处是,只要您可以访问它,就可以随时使用它发送消息,而不必等待传入消息回复。