批次之间的 Spark 流数据共享
Spark streaming data sharing between batches
Spark streaming 以微批处理数据。
每个间隔数据使用 RDD 并行处理,每个间隔之间没有任何数据共享。
但我的用例需要在间隔之间共享数据。
考虑 Network WordCount 示例,该示例生成该时间间隔内收到的所有单词的计数。
我将如何计算以下字数?
单词"hadoop"和"spark"与之前间隔计数的相对计数
所有其他词的正常字数。
注意:UpdateStateByKey 执行有状态处理,但这会将功能应用于每条记录而不是特定记录。
因此,UpdateStateByKey 不符合此要求。
更新:
考虑以下示例
区间-1
输入:
Sample Input with Hadoop and Spark on Hadoop
输出:
hadoop 2
sample 1
input 1
with 1
and 1
spark 1
on 1
区间-2
输入:
Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark
输出:
another 3
hadoop 1
spark 2
and 2
sample 1
input 1
with 1
on 1
解释:
第一个间隔给出了所有单词的正常字数。
在第二个时间间隔内 hadoop 发生了 3 次但输出应该是 1 (3-2)
火花发生了 3 次,但输出应该是 2 (3-1)
对于所有其他词,它应该给出正常的字数。
因此,在处理第二个间隔数据时,它应该具有 hadoop 和 spark
第一个间隔的字数
这是一个带有插图的简单示例。
在实际用例中,需要数据共享的字段是RDD元素(RDD)的一部分,需要跟踪的值非常多。
即,在此示例中,像 hadoop 和 spark 关键字这样的关键字将近 100k 个要跟踪的关键字。
Apache Storm 中的类似用例:
这可以通过 "remembering" 接收到最后一个 RDD 并使用左连接将该数据与下一个流批次合并。我们利用 streamingContext.remember
使流式进程生成的 RDD 能够在我们需要的时候保留。
我们利用 dstream.transform
是在驱动程序上执行的操作这一事实,因此我们可以访问所有本地对象定义。特别是我们想用每批所需的值更新对最后一个 RDD 的可变引用。
可能一段代码使这个想法更清晰:
// configure the streaming context to remember the RDDs produced
// choose at least 2x the time of the streaming interval
ssc.remember(xx Seconds)
// Initialize the "currentData" with an empty RDD of the expected type
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD
// classic word count
val w1dstream = dstream.map(elem => (elem,1))
val count = w1dstream.reduceByKey(_ + _)
// Here's the key to make this work. Look how we update the value of the last RDD after using it.
val diffCount = count.transform{ rdd =>
val interestingKeys = Set("hadoop", "spark")
val interesting = rdd.filter{case (k,v) => interestingKeys(k)}
val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
currentData = interesting
countDiff
}
diffCount.print()
Spark streaming 以微批处理数据。
每个间隔数据使用 RDD 并行处理,每个间隔之间没有任何数据共享。
但我的用例需要在间隔之间共享数据。
考虑 Network WordCount 示例,该示例生成该时间间隔内收到的所有单词的计数。
我将如何计算以下字数?
单词"hadoop"和"spark"与之前间隔计数的相对计数
所有其他词的正常字数。
注意:UpdateStateByKey 执行有状态处理,但这会将功能应用于每条记录而不是特定记录。
因此,UpdateStateByKey 不符合此要求。
更新:
考虑以下示例
区间-1
输入:
Sample Input with Hadoop and Spark on Hadoop
输出:
hadoop 2
sample 1
input 1
with 1
and 1
spark 1
on 1
区间-2
输入:
Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark
输出:
another 3
hadoop 1
spark 2
and 2
sample 1
input 1
with 1
on 1
解释:
第一个间隔给出了所有单词的正常字数。
在第二个时间间隔内 hadoop 发生了 3 次但输出应该是 1 (3-2)
火花发生了 3 次,但输出应该是 2 (3-1)
对于所有其他词,它应该给出正常的字数。
因此,在处理第二个间隔数据时,它应该具有 hadoop 和 spark
第一个间隔的字数这是一个带有插图的简单示例。
在实际用例中,需要数据共享的字段是RDD元素(RDD)的一部分,需要跟踪的值非常多。
即,在此示例中,像 hadoop 和 spark 关键字这样的关键字将近 100k 个要跟踪的关键字。
Apache Storm 中的类似用例:
这可以通过 "remembering" 接收到最后一个 RDD 并使用左连接将该数据与下一个流批次合并。我们利用 streamingContext.remember
使流式进程生成的 RDD 能够在我们需要的时候保留。
我们利用 dstream.transform
是在驱动程序上执行的操作这一事实,因此我们可以访问所有本地对象定义。特别是我们想用每批所需的值更新对最后一个 RDD 的可变引用。
可能一段代码使这个想法更清晰:
// configure the streaming context to remember the RDDs produced
// choose at least 2x the time of the streaming interval
ssc.remember(xx Seconds)
// Initialize the "currentData" with an empty RDD of the expected type
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD
// classic word count
val w1dstream = dstream.map(elem => (elem,1))
val count = w1dstream.reduceByKey(_ + _)
// Here's the key to make this work. Look how we update the value of the last RDD after using it.
val diffCount = count.transform{ rdd =>
val interestingKeys = Set("hadoop", "spark")
val interesting = rdd.filter{case (k,v) => interestingKeys(k)}
val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
currentData = interesting
countDiff
}
diffCount.print()