如何测量 Akka WebSocket 流的吞吐量?
How does one measure throughput of Akka WebSocket stream?
我是 Akka 的新手,开发了一个示例 Akka WebSocket 服务器,它使用 BroadcastHub
(基于 Akka docs 的示例)将文件内容流式传输到客户端。
假设客户端的消耗速度与服务器一样快,我如何测量吞吐量 (messages/second)?
// file source
val fileSource = FileIO.fromPath(Paths.get(path)
// Akka file source
val theFileSource = fileSource
.toMat(BroadcastHub.sink)(Keep.right)
.run
//Akka kafka file source
lazy val kafkaSourceActorStream = {
val (kafkaSourceActorRef, kafkaSource) = Source.actorRef[String](Int.MaxValue, OverflowStrategy.fail)
.toMat(BroadcastHub.sink)(Keep.both).run()
Consumer.plainSource(consumerSettings, Subscriptions.topics("perf-test-topic"))
.runForeach(record => kafkaSourceActorRef ! record.value().toString)
}
def logicFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(Sink.ignore, theFileSource)
val websocketFlow: Flow[Message, Message, Any] = {
Flow[Message]
.collect {
case TextMessage.Strict(msg) => Future.successful(msg)
case _ => println("ignore streamed message")
}
.mapAsync(parallelism = 2)(identity)
.via(logicFlow)
.map { msg: String => TextMessage.Strict(msg) }
}
val fileRoute =
path("file") {
handleWebSocketMessages(websocketFlow)
}
}
def startServer(): Unit = {
bindingFuture = Http().bindAndHandle(wsRoutes, HOST, PORT)
log.info(s"Server online at http://localhost:9000/")
}
def stopServer(): Unit = {
bindingFuture
.flatMap(_.unbind())
.onComplete{
_ => system.terminate()
log.info("terminated")
}
}
//ws client
def connectToWebSocket(url: String) = {
println("Connecting to websocket: " + url)
val (upgradeResponse, closed) = Http().singleWebSocketRequest(WebSocketRequest(url), websocketFlow)
val connected = upgradeResponse.flatMap{ upgrade =>
if(upgrade.response.status == StatusCodes.SwitchingProtocols )
{
println("Web socket connection success")
Future.successful(Done)
}else {
println("Web socket connection failed with error: {}", upgrade.response.status)
throw new RuntimeException(s"Web socket connection failed: ${upgrade.response.status}")
}
}
connected.onComplete { msg =>
println(msg)
}
}
def websocketFlow: Flow[Message, Message, _] = {
Flow.fromSinkAndSource(printFlowRate, Source.maybe)
}
lazy val printFlowRate =
Flow[Message]
.alsoTo(fileSink("output.txt"))
.via(flowRate(1.seconds))
.to(Sink.foreach(rate => println(s"$rate")))
def flowRate(sampleTime: FiniteDuration) =
Flow[Message]
.conflateWithSeed(_ ⇒ 1){ case (acc, _) ⇒ acc + 1 }
.zip(Source.tick(sampleTime, sampleTime, NotUsed))
.map(_._1.toDouble / sampleTime.toUnit(SECONDS))
def fileSink(file: String): Sink[Message, Future[IOResult]] = {
Flow[Message]
.map{
case TextMessage.Strict(msg) => msg
case TextMessage.Streamed(stream) => stream.runFold("")(_ + _).flatMap(msg => Future.successful(msg))
}
.map(s => ByteString(s + "\n"))
.toMat(FileIO.toFile(new File(file)))(Keep.right)
}
我上次工作的地方实施了这种性质的性能基准。
基本上,这意味着创建一个简单的客户端应用程序来使用来自 websocket 的消息并输出一些指标。自然的选择是使用 akka-http client-side 对 websockets 的支持来实现客户端。参见:
https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#singlewebsocketrequest
然后我们使用 micrometer 库向 Prometheus 公开指标,这是我们选择的报告和图表工具。
您可以将 throughput-measuring 流附加到现有流。这是一个受 启发的示例,它每秒打印从上游源发出的整数数量:
val rateSink = Flow[Int]
.conflateWithSeed(_ => 0){ case (acc, _) => acc + 1 }
.zip(Source.tick(1.second, 1.second, NotUsed))
.map(_._1)
.toMat(Sink.foreach(i => println(s"$i elements/second")))(Keep.right)
在下面的示例中,我们将上述接收器附加到发出 1 到 1000 万整数的源。为了防止 rate-measuring 流干扰主流(在这种情况下,它只是将每个整数转换为字符串,returns 最后一个字符串作为物化值的一部分处理),我们使用 wireTapMat
:
val (rateFut, mainFut) = Source(1 to 10000000)
.wireTapMat(rateSink)(Keep.right)
.map(_.toString)
.toMat(Sink.last[String])(Keep.both)
.run() // (Future[Done], Future[String])
rateFut onComplete {
case Success(x) => println(s"rateFut completed: $x")
case Failure(_) =>
}
mainFut onComplete {
case Success(s) => println(s"mainFut completed: $s")
case Failure(_) =>
}
运行 上面的示例打印如下内容:
0 elements/second
2597548 elements/second
3279052 elements/second
mainFut completed: 10000000
3516141 elements/second
607254 elements/second
rateFut completed: Done
如果不需要引用 rateSink
的物化值,请使用 wireTap
而不是 wireTapMat
。例如,将 rateSink
附加到您的 WebSocket 流可能如下所示:
val websocketFlow: Flow[Message, Message, Any] = {
Flow[Message]
.wireTap(rateSink) // <---
.collect {
case TextMessage.Strict(msg) => Future.successful(msg)
case _ => println("ignore streamed message")
}
.mapAsync(parallelism = 2)(identity)
.via(logicFlow)
.map { msg: String => TextMessage.Strict(msg) }
}
wireTap
在 Source
和 Flow
上定义。
我是 Akka 的新手,开发了一个示例 Akka WebSocket 服务器,它使用 BroadcastHub
(基于 Akka docs 的示例)将文件内容流式传输到客户端。
假设客户端的消耗速度与服务器一样快,我如何测量吞吐量 (messages/second)?
// file source
val fileSource = FileIO.fromPath(Paths.get(path)
// Akka file source
val theFileSource = fileSource
.toMat(BroadcastHub.sink)(Keep.right)
.run
//Akka kafka file source
lazy val kafkaSourceActorStream = {
val (kafkaSourceActorRef, kafkaSource) = Source.actorRef[String](Int.MaxValue, OverflowStrategy.fail)
.toMat(BroadcastHub.sink)(Keep.both).run()
Consumer.plainSource(consumerSettings, Subscriptions.topics("perf-test-topic"))
.runForeach(record => kafkaSourceActorRef ! record.value().toString)
}
def logicFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(Sink.ignore, theFileSource)
val websocketFlow: Flow[Message, Message, Any] = {
Flow[Message]
.collect {
case TextMessage.Strict(msg) => Future.successful(msg)
case _ => println("ignore streamed message")
}
.mapAsync(parallelism = 2)(identity)
.via(logicFlow)
.map { msg: String => TextMessage.Strict(msg) }
}
val fileRoute =
path("file") {
handleWebSocketMessages(websocketFlow)
}
}
def startServer(): Unit = {
bindingFuture = Http().bindAndHandle(wsRoutes, HOST, PORT)
log.info(s"Server online at http://localhost:9000/")
}
def stopServer(): Unit = {
bindingFuture
.flatMap(_.unbind())
.onComplete{
_ => system.terminate()
log.info("terminated")
}
}
//ws client
def connectToWebSocket(url: String) = {
println("Connecting to websocket: " + url)
val (upgradeResponse, closed) = Http().singleWebSocketRequest(WebSocketRequest(url), websocketFlow)
val connected = upgradeResponse.flatMap{ upgrade =>
if(upgrade.response.status == StatusCodes.SwitchingProtocols )
{
println("Web socket connection success")
Future.successful(Done)
}else {
println("Web socket connection failed with error: {}", upgrade.response.status)
throw new RuntimeException(s"Web socket connection failed: ${upgrade.response.status}")
}
}
connected.onComplete { msg =>
println(msg)
}
}
def websocketFlow: Flow[Message, Message, _] = {
Flow.fromSinkAndSource(printFlowRate, Source.maybe)
}
lazy val printFlowRate =
Flow[Message]
.alsoTo(fileSink("output.txt"))
.via(flowRate(1.seconds))
.to(Sink.foreach(rate => println(s"$rate")))
def flowRate(sampleTime: FiniteDuration) =
Flow[Message]
.conflateWithSeed(_ ⇒ 1){ case (acc, _) ⇒ acc + 1 }
.zip(Source.tick(sampleTime, sampleTime, NotUsed))
.map(_._1.toDouble / sampleTime.toUnit(SECONDS))
def fileSink(file: String): Sink[Message, Future[IOResult]] = {
Flow[Message]
.map{
case TextMessage.Strict(msg) => msg
case TextMessage.Streamed(stream) => stream.runFold("")(_ + _).flatMap(msg => Future.successful(msg))
}
.map(s => ByteString(s + "\n"))
.toMat(FileIO.toFile(new File(file)))(Keep.right)
}
我上次工作的地方实施了这种性质的性能基准。
基本上,这意味着创建一个简单的客户端应用程序来使用来自 websocket 的消息并输出一些指标。自然的选择是使用 akka-http client-side 对 websockets 的支持来实现客户端。参见:
https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#singlewebsocketrequest
然后我们使用 micrometer 库向 Prometheus 公开指标,这是我们选择的报告和图表工具。
您可以将 throughput-measuring 流附加到现有流。这是一个受
val rateSink = Flow[Int]
.conflateWithSeed(_ => 0){ case (acc, _) => acc + 1 }
.zip(Source.tick(1.second, 1.second, NotUsed))
.map(_._1)
.toMat(Sink.foreach(i => println(s"$i elements/second")))(Keep.right)
在下面的示例中,我们将上述接收器附加到发出 1 到 1000 万整数的源。为了防止 rate-measuring 流干扰主流(在这种情况下,它只是将每个整数转换为字符串,returns 最后一个字符串作为物化值的一部分处理),我们使用 wireTapMat
:
val (rateFut, mainFut) = Source(1 to 10000000)
.wireTapMat(rateSink)(Keep.right)
.map(_.toString)
.toMat(Sink.last[String])(Keep.both)
.run() // (Future[Done], Future[String])
rateFut onComplete {
case Success(x) => println(s"rateFut completed: $x")
case Failure(_) =>
}
mainFut onComplete {
case Success(s) => println(s"mainFut completed: $s")
case Failure(_) =>
}
运行 上面的示例打印如下内容:
0 elements/second
2597548 elements/second
3279052 elements/second
mainFut completed: 10000000
3516141 elements/second
607254 elements/second
rateFut completed: Done
如果不需要引用 rateSink
的物化值,请使用 wireTap
而不是 wireTapMat
。例如,将 rateSink
附加到您的 WebSocket 流可能如下所示:
val websocketFlow: Flow[Message, Message, Any] = {
Flow[Message]
.wireTap(rateSink) // <---
.collect {
case TextMessage.Strict(msg) => Future.successful(msg)
case _ => println("ignore streamed message")
}
.mapAsync(parallelism = 2)(identity)
.via(logicFlow)
.map { msg: String => TextMessage.Strict(msg) }
}
wireTap
在 Source
和 Flow
上定义。