批次之间的 Spark 流数据共享

Spark streaming data sharing between batches

Spark streaming 以微批处理数据。

每个间隔数据使用 RDD 并行处理,每个间隔之间没有任何数据共享。

但我的用例需要在间隔之间共享数据。

考虑 Network WordCount 示例,该示例生成该时间间隔内收到的所有单词的计数。

我将如何计算以下字数?

注意:UpdateStateByKey 执行有状态处理,但这会将功能应用于每条记录而不是特定记录。

因此,UpdateStateByKey 不符合此要求。

更新:

考虑以下示例

区间-1

输入:

Sample Input with Hadoop and Spark on Hadoop

输出:

hadoop  2
sample  1
input   1
with    1
and 1
spark   1
on  1

区间-2

输入:

Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark

输出:

another 3
hadoop  1
spark   2
and 2
sample  1
input   1
with    1
on  1

解释:

第一个间隔给出了所有单词的正常字数。

在第二个时间间隔内 hadoop 发生了 3 次但输出应该是 1 (3-2)

火花发生了 3 次,但输出应该是 2 (3-1)

对于所有其他词,它应该给出正常的字数。

因此,在处理第二个间隔数据时,它应该具有 hadoopspark

第一个间隔的字数

这是一个带有插图的简单示例。

在实际用例中,需要数据共享的字段是RDD元素(RDD)的一部分,需要跟踪的值非常多。

即,在此示例中,像 hadoop 和 spark 关键字这样的关键字将近 100k 个要跟踪的关键字。

Apache Storm 中的类似用例:

Storm TransactionalWords

这可以通过 "remembering" 接收到最后一个 RDD 并使用左连接将该数据与下一个流批次合并。我们利用 streamingContext.remember 使流式进程生成的 RDD 能够在我们需要的时候保留。

我们利用 dstream.transform 是在驱动程序上执行的操作这一事实,因此我们可以访问所有本地对象定义。特别是我们想用每批所需的值更新对最后一个 RDD 的可变引用。

可能一段代码使这个想法更清晰:

// configure the streaming context to remember the RDDs produced
// choose at least 2x the time of the streaming interval
ssc.remember(xx Seconds)  

// Initialize the "currentData" with an empty RDD of the expected type
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD

// classic word count
val w1dstream = dstream.map(elem => (elem,1))    
val count = w1dstream.reduceByKey(_ + _)    

// Here's the key to make this work. Look how we update the value of the last RDD after using it. 
val diffCount = count.transform{ rdd => 
                val interestingKeys = Set("hadoop", "spark")               
                val interesting = rdd.filter{case (k,v) => interestingKeys(k)}                                
                val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
                currentData = interesting
                countDiff                
               }

diffCount.print()