如何使用 mapWithState 提取超时会话
How to extract timed-out sessions using mapWithState
我正在更新我的代码以从 updateStateByKey
切换到 mapWithState
,以便根据 2 分钟的超时(2 仅用于测试目的)获取用户会话。每个会话应在超时前聚合会话中的所有流数据(JSON 字符串)。
这是我的旧代码:
val membersSessions = stream.map[(String, (Long, Long, List[String]))](eventRecord => {
val parsed = Utils.parseJSON(eventRecord)
val member_id = parsed.getOrElse("member_id", "")
val timestamp = parsed.getOrElse("timestamp", "").toLong
//The timestamp is returned twice because the first one will be used as the start time and the second one as the end time
(member_id, (timestamp, timestamp, List(eventRecord)))
})
val latestSessionInfo = membersSessions.map[(String, (Long, Long, Long, List[String]))](a => {
//transform to (member_id, (time, time, counter, events within session))
(a._1, (a._2._1, a._2._2, 1, a._2._3))
}).
reduceByKey((a, b) => {
//transform to (member_id, (lowestStartTime, MaxFinishTime, sumOfCounter, events within session))
(Math.min(a._1, b._1), Math.max(a._2, b._2), a._3 + b._3, a._4 ++ b._4)
}).updateStateByKey(Utils.updateState)
updateStateByKey
的问题很好地解释了 here。我决定使用 mapWithState
的关键原因之一是因为 updateStateByKey
无法 return 完成会话(超时的会话)进一步处理。
这是我第一次尝试将旧代码转换为新版本:
val spec = StateSpec.function(updateState _).timeout(Minutes(1))
val latestSessionInfo = membersSessions.map[(String, (Long, Long, Long, List[String]))](a => {
//transform to (member_id, (time, time, counter, events within session))
(a._1, (a._2._1, a._2._2, 1, a._2._3))
})
val userSessionSnapshots = latestSessionInfo.mapWithState(spec).snapshotStream()
我有点误解updateState
的内容应该是什么,因为据我了解超时不应该手动计算(以前在我的函数Utils.updateState
中完成)并且.snapshotStream
应该 return 超时会话。
假设您 总是 等待 2 分钟的超时,您可以让 mapWithState
流仅在触发超时后输出数据。
这对您的代码意味着什么?这意味着您现在需要监视超时而不是在每次迭代中输出元组。我想你的 mapWithState
看起来应该是这样的:
def updateState(key: String,
value: Option[(Long, Long, Long, List[String])],
state: State[(Long, Long, Long, List[String])]): Option[(Long, Long, Long, List[String])] = {
def reduce(first: (Long, Long, Long, List[String]), second: (Long, Long, Long, List[String])) = {
(Math.min(first._1, second._1), Math.max(first._2, second._2), first._3 + second._3, first._4 ++ second._4)
}
value match {
case Some(currentValue) =>
val result = state
.getOption()
.map(currentState => reduce(currentState, currentValue))
.getOrElse(currentValue)
state.update(result)
None
case _ if state.isTimingOut() => state.getOption()
}
}
这样,如果状态已经超时,你只输出一些外部的东西到流,否则你在状态内聚合它。
这意味着您的 Spark DStream 图表可以过滤掉所有未定义的值,并且只保留那些:
latestSessionInfo
.mapWithState(spec)
.filter(_.isDefined)
在 filter
之后,您将只有超时的状态。
我正在更新我的代码以从 updateStateByKey
切换到 mapWithState
,以便根据 2 分钟的超时(2 仅用于测试目的)获取用户会话。每个会话应在超时前聚合会话中的所有流数据(JSON 字符串)。
这是我的旧代码:
val membersSessions = stream.map[(String, (Long, Long, List[String]))](eventRecord => {
val parsed = Utils.parseJSON(eventRecord)
val member_id = parsed.getOrElse("member_id", "")
val timestamp = parsed.getOrElse("timestamp", "").toLong
//The timestamp is returned twice because the first one will be used as the start time and the second one as the end time
(member_id, (timestamp, timestamp, List(eventRecord)))
})
val latestSessionInfo = membersSessions.map[(String, (Long, Long, Long, List[String]))](a => {
//transform to (member_id, (time, time, counter, events within session))
(a._1, (a._2._1, a._2._2, 1, a._2._3))
}).
reduceByKey((a, b) => {
//transform to (member_id, (lowestStartTime, MaxFinishTime, sumOfCounter, events within session))
(Math.min(a._1, b._1), Math.max(a._2, b._2), a._3 + b._3, a._4 ++ b._4)
}).updateStateByKey(Utils.updateState)
updateStateByKey
的问题很好地解释了 here。我决定使用 mapWithState
的关键原因之一是因为 updateStateByKey
无法 return 完成会话(超时的会话)进一步处理。
这是我第一次尝试将旧代码转换为新版本:
val spec = StateSpec.function(updateState _).timeout(Minutes(1))
val latestSessionInfo = membersSessions.map[(String, (Long, Long, Long, List[String]))](a => {
//transform to (member_id, (time, time, counter, events within session))
(a._1, (a._2._1, a._2._2, 1, a._2._3))
})
val userSessionSnapshots = latestSessionInfo.mapWithState(spec).snapshotStream()
我有点误解updateState
的内容应该是什么,因为据我了解超时不应该手动计算(以前在我的函数Utils.updateState
中完成)并且.snapshotStream
应该 return 超时会话。
假设您 总是 等待 2 分钟的超时,您可以让 mapWithState
流仅在触发超时后输出数据。
这对您的代码意味着什么?这意味着您现在需要监视超时而不是在每次迭代中输出元组。我想你的 mapWithState
看起来应该是这样的:
def updateState(key: String,
value: Option[(Long, Long, Long, List[String])],
state: State[(Long, Long, Long, List[String])]): Option[(Long, Long, Long, List[String])] = {
def reduce(first: (Long, Long, Long, List[String]), second: (Long, Long, Long, List[String])) = {
(Math.min(first._1, second._1), Math.max(first._2, second._2), first._3 + second._3, first._4 ++ second._4)
}
value match {
case Some(currentValue) =>
val result = state
.getOption()
.map(currentState => reduce(currentState, currentValue))
.getOrElse(currentValue)
state.update(result)
None
case _ if state.isTimingOut() => state.getOption()
}
}
这样,如果状态已经超时,你只输出一些外部的东西到流,否则你在状态内聚合它。
这意味着您的 Spark DStream 图表可以过滤掉所有未定义的值,并且只保留那些:
latestSessionInfo
.mapWithState(spec)
.filter(_.isDefined)
在 filter
之后,您将只有超时的状态。