如何在 Akka Stream 中记录流量?
How to log flow rate in Akka Stream?
我有一个带有单个 flow/graph 的 Akka Stream 应用程序。我想在源头测量流量并每 5 秒记录一次,例如 'received 3 messages in the last 5 seconds'。我试过,
someOtherFlow
.groupedWithin(Integer.MAX_VALUE, 5 seconds)
.runForeach(seq =>
log.debug(s"received ${seq.length} messages in the last 5 seconds")
)
但只有有消息才输出,0条消息时不输出空列表。我也想要0。这可能吗?
你可以试试
src
.conflateWithSeed(_ ⇒ 1){ case (acc, _) ⇒ acc + 1 }
.zip(Source.tick(5.seconds, 5.seconds, NotUsed))
.map(_._1)
这应该批处理您的元素,直到 tick 释放它们。这是受 an example in the docs.
启发
换句话说,如果您出于监控目的需要它,您可以为此目的利用第三方工具 - 例如Kamon.
稍微扩展一下 Stefano 的回答,我创建了以下流程:
def flowRate[T](metric: T => Int = (_: T) => 1, outputDelay: FiniteDuration = 1 second): Flow[T, Double, NotUsed] =
Flow[T]
.conflateWithSeed(metric(_)){ case (acc, x) ⇒ acc + metric(x) }
.zip(Source.tick(outputDelay, outputDelay, NotUsed))
.map(_._1.toDouble / outputDelay.toUnit(SECONDS))
def printFlowRate[T](name: String, metric: T => Int = (_: T) => 1,
outputDelay: FiniteDuration = 1 second): Flow[T, T, NotUsed] =
Flow[T]
.alsoTo(flowRate[T](metric, outputDelay)
.to(Sink.foreach(r => log.info(s"Rate($name): $r"))))
第一个将流量转换为每秒的速率。您可以提供一个 metric
,它为每个经过的对象赋予一个值。假设您想测量字符串流中字符的比率,那么您可以传递 _.length
。第二个参数是流量报告之间的延迟(默认为一秒)。
第二个流可用于内联打印流速以进行调试,而无需修改通过流的值。例如
stringFlow
.via(printFlowRate[String]("Char rate", _.length, 10 seconds))
.map(_.toLowercase) // still a string
...
这将每 10 秒显示一次字符的平均速率(每秒)。
N.B. 然而上面的 flowRate
会滞后一个 outputDelay
周期,因为 zip
会消耗从 conflate
开始,然后等待报价(可以通过在 conflateWithSeed
之后放置 log
轻松验证)。要获得非滞后流量(度量),可以复制滴答,以强制 zip
从 conflate
消耗第二个新元素,然后聚合两个滴答,即:
Flow[T]
.conflateWithSeed(metric(_)){case (acc, x) => acc + metric(x) }
.zip(Source.tick(outputDelay, outputDelay, NotUsed)
.mapConcat(_ => Seq(NotUsed, NotUsed))
)
.grouped(2).map {
case Seq((a, _), (b, _)) => a + b
}
.map(_.toDouble / outputDelay.toUnit(SECONDS))
一个示例 akka 流日志记录。
implicit val system: ActorSystem = ActorSystem("StreamLoggingActorSystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val adapter: LoggingAdapter = Logging(system, "customLogger")
implicit val ec: ExecutionContextExecutor = system.dispatcher
def randomInt = Random.nextInt()
val source = Source.repeat(NotUsed).map(_ ⇒ randomInt)
val logger = source
.groupedWithin(Integer.MAX_VALUE, 5.seconds)
.log(s"in the last 5 seconds number of messages received : ", _.size)
.withAttributes(
Attributes.logLevels(
onElement = Logging.WarningLevel,
onFinish = Logging.InfoLevel,
onFailure = Logging.DebugLevel
)
)
val sink = Sink.ignore
val result: Future[Done] = logger.runWith(sink)
result.onComplete{
case Success(_) =>
println("end of stream")
case Failure(_) =>
println("stream ended with failure")
}
源代码是here.
我有一个带有单个 flow/graph 的 Akka Stream 应用程序。我想在源头测量流量并每 5 秒记录一次,例如 'received 3 messages in the last 5 seconds'。我试过,
someOtherFlow
.groupedWithin(Integer.MAX_VALUE, 5 seconds)
.runForeach(seq =>
log.debug(s"received ${seq.length} messages in the last 5 seconds")
)
但只有有消息才输出,0条消息时不输出空列表。我也想要0。这可能吗?
你可以试试
src
.conflateWithSeed(_ ⇒ 1){ case (acc, _) ⇒ acc + 1 }
.zip(Source.tick(5.seconds, 5.seconds, NotUsed))
.map(_._1)
这应该批处理您的元素,直到 tick 释放它们。这是受 an example in the docs.
启发换句话说,如果您出于监控目的需要它,您可以为此目的利用第三方工具 - 例如Kamon.
稍微扩展一下 Stefano 的回答,我创建了以下流程:
def flowRate[T](metric: T => Int = (_: T) => 1, outputDelay: FiniteDuration = 1 second): Flow[T, Double, NotUsed] =
Flow[T]
.conflateWithSeed(metric(_)){ case (acc, x) ⇒ acc + metric(x) }
.zip(Source.tick(outputDelay, outputDelay, NotUsed))
.map(_._1.toDouble / outputDelay.toUnit(SECONDS))
def printFlowRate[T](name: String, metric: T => Int = (_: T) => 1,
outputDelay: FiniteDuration = 1 second): Flow[T, T, NotUsed] =
Flow[T]
.alsoTo(flowRate[T](metric, outputDelay)
.to(Sink.foreach(r => log.info(s"Rate($name): $r"))))
第一个将流量转换为每秒的速率。您可以提供一个 metric
,它为每个经过的对象赋予一个值。假设您想测量字符串流中字符的比率,那么您可以传递 _.length
。第二个参数是流量报告之间的延迟(默认为一秒)。
第二个流可用于内联打印流速以进行调试,而无需修改通过流的值。例如
stringFlow
.via(printFlowRate[String]("Char rate", _.length, 10 seconds))
.map(_.toLowercase) // still a string
...
这将每 10 秒显示一次字符的平均速率(每秒)。
N.B. 然而上面的 flowRate
会滞后一个 outputDelay
周期,因为 zip
会消耗从 conflate
开始,然后等待报价(可以通过在 conflateWithSeed
之后放置 log
轻松验证)。要获得非滞后流量(度量),可以复制滴答,以强制 zip
从 conflate
消耗第二个新元素,然后聚合两个滴答,即:
Flow[T]
.conflateWithSeed(metric(_)){case (acc, x) => acc + metric(x) }
.zip(Source.tick(outputDelay, outputDelay, NotUsed)
.mapConcat(_ => Seq(NotUsed, NotUsed))
)
.grouped(2).map {
case Seq((a, _), (b, _)) => a + b
}
.map(_.toDouble / outputDelay.toUnit(SECONDS))
一个示例 akka 流日志记录。
implicit val system: ActorSystem = ActorSystem("StreamLoggingActorSystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val adapter: LoggingAdapter = Logging(system, "customLogger")
implicit val ec: ExecutionContextExecutor = system.dispatcher
def randomInt = Random.nextInt()
val source = Source.repeat(NotUsed).map(_ ⇒ randomInt)
val logger = source
.groupedWithin(Integer.MAX_VALUE, 5.seconds)
.log(s"in the last 5 seconds number of messages received : ", _.size)
.withAttributes(
Attributes.logLevels(
onElement = Logging.WarningLevel,
onFinish = Logging.InfoLevel,
onFailure = Logging.DebugLevel
)
)
val sink = Sink.ignore
val result: Future[Done] = logger.runWith(sink)
result.onComplete{
case Success(_) =>
println("end of stream")
case Failure(_) =>
println("stream ended with failure")
}
源代码是here.