是否可以使用 Spark Streaming 实时更新值?
Is is possible to real time update a value with spark streaming?
假设我有一个 Double 值流,我想每十秒计算一次平均值。我怎样才能有一个滑动 window 不需要重新计算平均值而是更新它,比方说,删除最旧的十秒的部分并仅添加新的 10 秒值?
TL;DR :使用 reduceByWindow
及其两个函数参数(跳转到代码片段的最后一段)
对您的问题有两种解释,一种是具体的(我如何获得一个 运行 一个小时的平均值,每 2 秒更新一次),另一种是一般的(我如何获得更新的计算以稀疏的方式陈述)。这是一般的答案。
首先,请注意有一种方法可以表示您的数据,这样您的平均更新很容易计算,基于 windowed DStream:这将您的数据表示为流的增量构造,最大共享。但是重新计算每批的平均值在计算上效率较低——正如您所指出的。
如果您确实想对可逆的复杂状态计算进行更新,但又不想触及流的构造,可以使用 updateStateByKey
– 但 Spark 无法帮助您在流中反映你计算的增量方面,你必须自己管理它。
在这里,你确实有一些简单且可逆的东西,而且你没有键的概念。您可以使用 reduceByWindow
及其逆归约参数,使用可以让您计算增量均值的常用函数。
val myInitialDStream: DStream[Float]
val myDStreamWithCount: DStream[(Float, Long)] =
myInitialDStream.map((x) => (x, 1L))
def addOneBatchToMean(previousMean: (Float, Long), newBatch: (Float, Long)): (Float, Long) =
(previousMean._1 + newBatch._1, previousMean._2 + newBatch._2)
def removeOneBatchToMean(previousMean: (Float, Long), oldBatch: (Float, Long)): (Float, Long) =
(previousMean._1 - oldBatch._1, previousMean._2 - oldBatch._2)
val runningMeans = myDStreamWithCount.reduceByWindow(addOneBatchToMean, removeOneBatchToMean, Durations.seconds(3600), Duractions.seconds(2))
你得到一个单元素流 RDD
s,每个元素包含一对 (m, n),其中 m 是你的 运行 1h-window 的总和n是1h-window中的元素个数。只需 return(或 map
)m/n 即可得到平均值。
假设我有一个 Double 值流,我想每十秒计算一次平均值。我怎样才能有一个滑动 window 不需要重新计算平均值而是更新它,比方说,删除最旧的十秒的部分并仅添加新的 10 秒值?
TL;DR :使用 reduceByWindow
及其两个函数参数(跳转到代码片段的最后一段)
对您的问题有两种解释,一种是具体的(我如何获得一个 运行 一个小时的平均值,每 2 秒更新一次),另一种是一般的(我如何获得更新的计算以稀疏的方式陈述)。这是一般的答案。
首先,请注意有一种方法可以表示您的数据,这样您的平均更新很容易计算,基于 windowed DStream:这将您的数据表示为流的增量构造,最大共享。但是重新计算每批的平均值在计算上效率较低——正如您所指出的。
如果您确实想对可逆的复杂状态计算进行更新,但又不想触及流的构造,可以使用 updateStateByKey
– 但 Spark 无法帮助您在流中反映你计算的增量方面,你必须自己管理它。
在这里,你确实有一些简单且可逆的东西,而且你没有键的概念。您可以使用 reduceByWindow
及其逆归约参数,使用可以让您计算增量均值的常用函数。
val myInitialDStream: DStream[Float]
val myDStreamWithCount: DStream[(Float, Long)] =
myInitialDStream.map((x) => (x, 1L))
def addOneBatchToMean(previousMean: (Float, Long), newBatch: (Float, Long)): (Float, Long) =
(previousMean._1 + newBatch._1, previousMean._2 + newBatch._2)
def removeOneBatchToMean(previousMean: (Float, Long), oldBatch: (Float, Long)): (Float, Long) =
(previousMean._1 - oldBatch._1, previousMean._2 - oldBatch._2)
val runningMeans = myDStreamWithCount.reduceByWindow(addOneBatchToMean, removeOneBatchToMean, Durations.seconds(3600), Duractions.seconds(2))
你得到一个单元素流 RDD
s,每个元素包含一对 (m, n),其中 m 是你的 运行 1h-window 的总和n是1h-window中的元素个数。只需 return(或 map
)m/n 即可得到平均值。