如何对来自无限流的传入事件进行分组?
How to group incoming events from infinite stream?
我有无限的事件流:
(timestamp, session_uid, traffic)
即
...
(1448089943, session-1, 10)
(1448089944, session-1, 20)
(1448089945, session-2, 50)
(1448089946, session-1, 30)
(1448089947, session-2, 10)
(1448089948, session-3, 10)
...
我想按 session_uid 对这些事件进行分组并计算每个会话的流量总和。
我写了一个 akka-streams
流程,它可以很好地处理有限流使用 groupBy
(我的代码基于菜谱中的 this 示例)。但是对于无限流,它不会起作用,因为 groupBy
函数应该处理所有传入的流,并且只有在那之后才会准备好 return 结果。
我认为我应该实施超时分组,即如果我没有收到指定 stream_uid 的事件超过 5 分钟,我应该 return 为此 session_uid 分组事件.但是如何实现它只使用 akka-streams
?
也许你可以简单地通过 actor 实现它
case class SessionCount(name: String)
class Hello private() extends Actor {
var sessionMap = Map[String, Int]()
override def receive: Receive = {
case (_, session: String, _) =>
sessionMap = sessionMap + (session -> (sessionMap.getOrElse(session, 0) + 1))
case SessionCount(name: String) => sender() ! sessionMap.get(name).getOrElse(0)
}
}
object Hello {
private val actor = ActorSystem.apply().actorOf(Props(new Hello))
private implicit val timeOver = Timeout(10, TimeUnit.SECONDS)
type Value = (String, String, String)
def add(value: Value) = actor ! value
def count(name:String) = (actor ? SessionCount(name )).mapTo[Int]
}
我想出了一个有点 gnarly 的解决方案,但我认为它可以完成工作。
基本思想是使用 Source 的 keepAlive
方法作为触发完成的计时器。
但要做到这一点,我们首先必须对数据进行一些抽象。计时器将需要从原始源发送触发器或另一个元组值,因此:
sealed trait Data
object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data
然后将我们的元组源转换为值源。我们仍将使用 groupBy
进行类似于您的有限流情况的分组:
val originalSource : Source[(Long, String, Int), Unit] = ???
type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid
val groupedDataSource : Source[IDGroup, Unit] =
originalSource.map(t => Value(t._1, t._2, t._3))
.groupBy(_.session_uid)
棘手的部分是处理仅是元组的分组:(String, Source[Value,Unit])
。如果时间已经过去,我们需要计时器通知我们,所以我们需要另一个抽象来知道我们是否仍在计算或者由于超时我们已经完成计算:
sealed trait Sum {
val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum
val zeroSum : Sum = StillComputing(0)
现在我们可以吸取各组的Source了。如果值源在 timeOut
之后没有产生任何东西,keepAlive
将发送 TimerTrigger
。来自 keepAlive 的 Data
然后与 TimerTrigger 或来自原始源的新值进行模式匹配:
val evaluateSum : ((Sum , Data)) => Sum = {
case (runningSum, data) => {
data match {
case TimerTrigger => ComputedSum(runningSum.sum)
case v : Value => StillComputing(runningSum.sum + v.traffic)
}
}
}//end val evaluateSum
type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid
def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult =
idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger)
.scan(zeroSum)(evaluateSum)
.collect {case c : ComputedSum => c.sum}
.runWith(Sink.head)
集合应用于仅匹配完成总和的部分函数,因此只有在计时器触发后才会到达 Sink。
然后我们将此处理程序应用于每个出现的分组:
val timeOut = FiniteDuration(5, MINUTES)
val sumSource : Source[SumResult, Unit] =
groupedDataSource map handleGroup(timeOut)
我们现在有一个 (String,Future[Int])
的来源,即 session_uid 和该 ID 流量总和的未来。
就像我说的,绕口但符合要求。另外,我不完全确定如果一个 uid 已经分组并且已经超时,但是随后出现具有相同 uid 的新值会发生什么。
这似乎是 Source.groupedWithin
的用例:
def groupedWithin(n: Int, d: FiniteDuration): Source[List[Out], Mat]
"Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first."
我有无限的事件流:
(timestamp, session_uid, traffic)
即
...
(1448089943, session-1, 10)
(1448089944, session-1, 20)
(1448089945, session-2, 50)
(1448089946, session-1, 30)
(1448089947, session-2, 10)
(1448089948, session-3, 10)
...
我想按 session_uid 对这些事件进行分组并计算每个会话的流量总和。
我写了一个 akka-streams
流程,它可以很好地处理有限流使用 groupBy
(我的代码基于菜谱中的 this 示例)。但是对于无限流,它不会起作用,因为 groupBy
函数应该处理所有传入的流,并且只有在那之后才会准备好 return 结果。
我认为我应该实施超时分组,即如果我没有收到指定 stream_uid 的事件超过 5 分钟,我应该 return 为此 session_uid 分组事件.但是如何实现它只使用 akka-streams
?
也许你可以简单地通过 actor 实现它
case class SessionCount(name: String)
class Hello private() extends Actor {
var sessionMap = Map[String, Int]()
override def receive: Receive = {
case (_, session: String, _) =>
sessionMap = sessionMap + (session -> (sessionMap.getOrElse(session, 0) + 1))
case SessionCount(name: String) => sender() ! sessionMap.get(name).getOrElse(0)
}
}
object Hello {
private val actor = ActorSystem.apply().actorOf(Props(new Hello))
private implicit val timeOver = Timeout(10, TimeUnit.SECONDS)
type Value = (String, String, String)
def add(value: Value) = actor ! value
def count(name:String) = (actor ? SessionCount(name )).mapTo[Int]
}
我想出了一个有点 gnarly 的解决方案,但我认为它可以完成工作。
基本思想是使用 Source 的 keepAlive
方法作为触发完成的计时器。
但要做到这一点,我们首先必须对数据进行一些抽象。计时器将需要从原始源发送触发器或另一个元组值,因此:
sealed trait Data
object TimerTrigger extends Data
case class Value(tstamp : Long, session_uid : String, traffic : Int) extends Data
然后将我们的元组源转换为值源。我们仍将使用 groupBy
进行类似于您的有限流情况的分组:
val originalSource : Source[(Long, String, Int), Unit] = ???
type IDGroup = (String, Source[Value, Unit]) //uid -> Source of Values for uid
val groupedDataSource : Source[IDGroup, Unit] =
originalSource.map(t => Value(t._1, t._2, t._3))
.groupBy(_.session_uid)
棘手的部分是处理仅是元组的分组:(String, Source[Value,Unit])
。如果时间已经过去,我们需要计时器通知我们,所以我们需要另一个抽象来知道我们是否仍在计算或者由于超时我们已经完成计算:
sealed trait Sum {
val sum : Int
}
case class StillComputing(val sum : Int) extends Sum
case class ComputedSum(val sum : Int) extends Sum
val zeroSum : Sum = StillComputing(0)
现在我们可以吸取各组的Source了。如果值源在 timeOut
之后没有产生任何东西,keepAlive
将发送 TimerTrigger
。来自 keepAlive 的 Data
然后与 TimerTrigger 或来自原始源的新值进行模式匹配:
val evaluateSum : ((Sum , Data)) => Sum = {
case (runningSum, data) => {
data match {
case TimerTrigger => ComputedSum(runningSum.sum)
case v : Value => StillComputing(runningSum.sum + v.traffic)
}
}
}//end val evaluateSum
type SumResult = (String, Future[Int]) // uid -> Future of traffic sum for uid
def handleGroup(timeOut : FiniteDuration)(idGroup : IDGroup) : SumResult =
idGroup._1 -> idGroup._2.keepAlive(timeOut, () => TimerTrigger)
.scan(zeroSum)(evaluateSum)
.collect {case c : ComputedSum => c.sum}
.runWith(Sink.head)
集合应用于仅匹配完成总和的部分函数,因此只有在计时器触发后才会到达 Sink。
然后我们将此处理程序应用于每个出现的分组:
val timeOut = FiniteDuration(5, MINUTES)
val sumSource : Source[SumResult, Unit] =
groupedDataSource map handleGroup(timeOut)
我们现在有一个 (String,Future[Int])
的来源,即 session_uid 和该 ID 流量总和的未来。
就像我说的,绕口但符合要求。另外,我不完全确定如果一个 uid 已经分组并且已经超时,但是随后出现具有相同 uid 的新值会发生什么。
这似乎是 Source.groupedWithin
的用例:
def groupedWithin(n: Int, d: FiniteDuration): Source[List[Out], Mat]
"Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first."