是否可以使用 Spark Streaming 实时更新值?

Is is possible to real time update a value with spark streaming?

假设我有一个 Double 值流,我想每十秒计算一次平均值。我怎样才能有一个滑动 window 不需要重新计算平均值而是更新它,比方说,删除最旧的十秒的部分并仅添加新的 10 秒值?

TL;DR :使用 reduceByWindow 及其两个函数参数(跳转到代码片段的最后一段)

对您的问题有两种解释,一种是具体的(我如何获得一个 运行 一个小时的平均值,每 2 秒更新一次),另一种是一般的(我如何获得更新的计算以稀疏的方式陈述)。这是一般的答案。

首先,请注意有一种方法可以表示您的数据,这样您的平均更新很容易计算,基于 windowed DStream:这将您的数据表示为流的增量构造,最大共享。但是重新计算每批的平均值在计算上效率较低——正如您所指出的。

如果您确实想对可逆的复杂状态计算进行更新,但又不想触及流的构造,可以使用 updateStateByKey – 但 Spark 无法帮助您在流中反映你计算的增量方面,你必须自己管理它。

在这里,你确实有一些简单且可逆的东西,而且你没有键的概念。您可以使用 reduceByWindow 及其逆归约参数,使用可以让您计算增量均值的常用函数。

val myInitialDStream: DStream[Float]

val myDStreamWithCount: DStream[(Float, Long)] = 
  myInitialDStream.map((x) => (x, 1L))

def addOneBatchToMean(previousMean: (Float, Long), newBatch: (Float, Long)): (Float, Long) = 
  (previousMean._1 + newBatch._1, previousMean._2 + newBatch._2)

def removeOneBatchToMean(previousMean: (Float, Long), oldBatch: (Float, Long)): (Float, Long) = 
  (previousMean._1 - oldBatch._1, previousMean._2 - oldBatch._2)

val runningMeans = myDStreamWithCount.reduceByWindow(addOneBatchToMean, removeOneBatchToMean, Durations.seconds(3600), Duractions.seconds(2))

你得到一个单元素流 RDDs,每个元素包含一对 (m, n),其中 m 是你的 运行 1h-window 的总和n是1h-window中的元素个数。只需 return(或 map)m/n 即可得到平均值。