如何在火花流中使用 updateStateByKey() 和 socketTextStream?
How to use updateStateByKey() with using socketTextStream in spark streaming?
第一次测试:
测试代码如下:
object StreamTest {
def main(args: Array[String]) {
val sc = new SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("./checkpoint")
val lines = ssc.socketTextStream("192.168.11.5", 9999, StorageLevel.MEMORY_ONLY_SER)
val accStream = lines.map((_ , "")).updateStateByKey(updateFunc)
accStream.print()
ssc.start()
ssc.awaitTermination()
}
def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}
当我通过NetCat发送一个数据(只有一个)时,看截图:
结果为:
我的问题是:为什么一直打印结果?为什么不是一次? (我只向套接字客户端发送一个数据。)
第二次测试:
我又在测试了(设置spark streaming间隔时间为5秒):
发送数据:
结果是:
第三次测试:
使用ConstantInputDStream
进行测试,代码如下:
object SparkStreaming {
def main(args: Array[String]) {
val sc = new SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("./checkpoint")
val seq = Seq("key") //every 1 second send a "key"
val rdd = ssc.sparkContext.parallelize(seq)
//using ConstantInputDStream as inputDStream
val inputDStream = new ConstantInputDStream(ssc, rdd)
val map = inputDStream.map((_, "")).updateStateByKey(updateFunc)
map.print
ssc.start
ssc.awaitTermination
}
def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}
结果是:
第 3 次测试的结果与第 1 次测试的结果相同。
第一次测试,我只在第一秒发了一个"key"。
在第 3 次测试中,ConstantInputDStream 发送 "key" 每 1 秒 .
但为什么结果是一样的?所以使用 socketTextStream
.
结果真的很奇怪
你能告诉我为什么吗?非常感谢!
那是因为 updateStateByKey
保存您的状态并且不会更新,除非处理新的提要。
updateStateByKey
的重点是在需要时保存和积累状态。在 updateStateByKey
之后,您的流是一组元组,其中包含更新函数中的键和 returned 值。它将保持密钥的状态,直到您从更新功能 return None
而不是 Some
。
你可以参考这个答案中的示例实现:
第一次测试:
测试代码如下:
object StreamTest {
def main(args: Array[String]) {
val sc = new SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("./checkpoint")
val lines = ssc.socketTextStream("192.168.11.5", 9999, StorageLevel.MEMORY_ONLY_SER)
val accStream = lines.map((_ , "")).updateStateByKey(updateFunc)
accStream.print()
ssc.start()
ssc.awaitTermination()
}
def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}
当我通过NetCat发送一个数据(只有一个)时,看截图:
结果为:
我的问题是:为什么一直打印结果?为什么不是一次? (我只向套接字客户端发送一个数据。)
第二次测试:
我又在测试了(设置spark streaming间隔时间为5秒):
发送数据:
结果是:
第三次测试:
使用ConstantInputDStream
进行测试,代码如下:
object SparkStreaming {
def main(args: Array[String]) {
val sc = new SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("./checkpoint")
val seq = Seq("key") //every 1 second send a "key"
val rdd = ssc.sparkContext.parallelize(seq)
//using ConstantInputDStream as inputDStream
val inputDStream = new ConstantInputDStream(ssc, rdd)
val map = inputDStream.map((_, "")).updateStateByKey(updateFunc)
map.print
ssc.start
ssc.awaitTermination
}
def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}
结果是:
第 3 次测试的结果与第 1 次测试的结果相同。
第一次测试,我只在第一秒发了一个"key"。
在第 3 次测试中,ConstantInputDStream 发送 "key" 每 1 秒 .
但为什么结果是一样的?所以使用 socketTextStream
.
你能告诉我为什么吗?非常感谢!
那是因为 updateStateByKey
保存您的状态并且不会更新,除非处理新的提要。
updateStateByKey
的重点是在需要时保存和积累状态。在 updateStateByKey
之后,您的流是一组元组,其中包含更新函数中的键和 returned 值。它将保持密钥的状态,直到您从更新功能 return None
而不是 Some
。
你可以参考这个答案中的示例实现: