Spark 流式窗口输出
Spark Streaming Windowing output
我正在开发一个 spark 流应用程序,我需要在其中打印 json 属性的最小值、最大值,该属性应该每 20 秒在 window 上打印最小值、最大值2秒的滑动window。
基本上(对于 POC)我想在作业组 sparkContext 的 Spark UI 上打印 min、max。
SetJobGroup ("count-min-max", "count-min-max value of quality attribute").
这应该在 Spark UI 显示器上每 20 秒显示一次。
下面是我的代码,我可以获得最小值、最大值、计数值,但是每 2 秒执行一次打印,这是流式处理批处理间隔,而不是 window 20 秒。
val ssc = new StreamingContext(sparkContext, Seconds(2))
val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)
//val jsonCounts=lines.map { jsonRecord => parseJson(jsonRecord) }.map { x => x.mkString("\n") }.print
val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
.window(Seconds(20),Seconds(2))
valueDtsream.foreachRDD
{
rdd =>
if (!rdd.partitions.isEmpty)
{
val stats = rdd.flatMap(x => x)
println(stats.count().toString()+"-"+stats.min().toString()+"-"+stats.max().toString)
}
}
ssc.start()
ssc.awaitTermination()
我认为您对 slideInterval
和 windowLength
感到困惑。在 window(windowLength, slideInterval)
:
windowLength
是window的长度,意思是window在计算时应该考虑多少个数据区间。
slideInterval
是 window 在 window 计算完成后移动的间隔数。
如果我正确理解你的问题,你应该将其编辑为:.window(Seconds(x),Seconds(20))
。
我正在开发一个 spark 流应用程序,我需要在其中打印 json 属性的最小值、最大值,该属性应该每 20 秒在 window 上打印最小值、最大值2秒的滑动window。 基本上(对于 POC)我想在作业组 sparkContext 的 Spark UI 上打印 min、max。
SetJobGroup ("count-min-max", "count-min-max value of quality attribute").
这应该在 Spark UI 显示器上每 20 秒显示一次。
下面是我的代码,我可以获得最小值、最大值、计数值,但是每 2 秒执行一次打印,这是流式处理批处理间隔,而不是 window 20 秒。
val ssc = new StreamingContext(sparkContext, Seconds(2))
val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)
//val jsonCounts=lines.map { jsonRecord => parseJson(jsonRecord) }.map { x => x.mkString("\n") }.print
val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
.window(Seconds(20),Seconds(2))
valueDtsream.foreachRDD
{
rdd =>
if (!rdd.partitions.isEmpty)
{
val stats = rdd.flatMap(x => x)
println(stats.count().toString()+"-"+stats.min().toString()+"-"+stats.max().toString)
}
}
ssc.start()
ssc.awaitTermination()
我认为您对 slideInterval
和 windowLength
感到困惑。在 window(windowLength, slideInterval)
:
windowLength
是window的长度,意思是window在计算时应该考虑多少个数据区间。slideInterval
是 window 在 window 计算完成后移动的间隔数。
如果我正确理解你的问题,你应该将其编辑为:.window(Seconds(x),Seconds(20))
。