如何在火花流中使用 updateStateByKey() 和 socketTextStream?

How to use updateStateByKey() with using socketTextStream in spark streaming?

第一次测试:

测试代码如下:

object StreamTest {

    def main(args: Array[String]) {
        val sc = new SparkContext
        val ssc = new StreamingContext(sc, Seconds(1))
        ssc.checkpoint("./checkpoint")

        val lines = ssc.socketTextStream("192.168.11.5", 9999, StorageLevel.MEMORY_ONLY_SER)
        val accStream = lines.map((_ , "")).updateStateByKey(updateFunc)
        accStream.print()
        ssc.start()
        ssc.awaitTermination()
    }
    def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}

当我通过NetCat发送一个数据(只有一个)时,看截图:

结果为:

我的问题是:为什么一直打印结果?为什么不是一次? (我只向套接字客户端发送一个数据。)

第二次测试:

我又在测试了(设置spark streaming间隔时间为5秒):

发送数据:

结果是:

第三次测试:

使用ConstantInputDStream进行测试,代码如下:

object SparkStreaming {
    def main(args: Array[String]) {
        val sc = new SparkContext
        val ssc = new StreamingContext(sc, Seconds(1))
        ssc.checkpoint("./checkpoint")
        val seq = Seq("key")   //every 1 second send a "key"
        val rdd = ssc.sparkContext.parallelize(seq)
        //using ConstantInputDStream as inputDStream
        val inputDStream = new ConstantInputDStream(ssc, rdd)

        val map = inputDStream.map((_, "")).updateStateByKey(updateFunc)
        map.print
        ssc.start
        ssc.awaitTermination
    }

    def updateFunc: (Seq[String], Option[Int]) => Option[Int] = { case _ => Some(1) }
}

结果是:

第 3 次测试的结果与第 1 次测试的结果相同。

第一次测试,我只在第一秒发了一个"key"。

在第 3 次测试中,ConstantInputDStream 发送 "key" 每 1 秒 .

但为什么结果是一样的?所以使用 socketTextStream.

结果真的很奇怪

你能告诉我为什么吗?非常感谢!

那是因为 updateStateByKey 保存您的状态并且不会更新,除非处理新的提要。

updateStateByKey 的重点是在需要时保存和积累状态。在 updateStateByKey 之后,您的流是一组元组,其中包含更新函数中的键和 returned 值。它将保持密钥的状态,直到您从更新功能 return None 而不是 Some

你可以参考这个答案中的示例实现: