Spark Streaming Sliding Window 最大值和最小值
Spark Streaming Sliding Window max and min
我是Spark初学者;我正在处理 spark 流用例,我收到一条 json 消息,每个 json 消息都有一个属性 'value',在解析 json 后它是 double 我得到一个 Array[Double] .我想找出最后 15 秒的最大值(值)和最小值(值),滑动 window 为 2 秒。
这是我的代码。
val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)
val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
.window(Seconds(15),Seconds(2))
valueDtsream.foreachRDD
{
rdd =>
if (!rdd.partitions.isEmpty)
{
//code to find min and max
}
}
ssc.start()
ssc.awaitTermination()
尝试:
valueDtsream.transform( rdd => {
val stats = rdd.flatMap(x => x).stats
rdd.sparkContext.parallelize(Seq((stats.min, stats.max)))
})
我是Spark初学者;我正在处理 spark 流用例,我收到一条 json 消息,每个 json 消息都有一个属性 'value',在解析 json 后它是 double 我得到一个 Array[Double] .我想找出最后 15 秒的最大值(值)和最小值(值),滑动 window 为 2 秒。 这是我的代码。
val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)
val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
.window(Seconds(15),Seconds(2))
valueDtsream.foreachRDD
{
rdd =>
if (!rdd.partitions.isEmpty)
{
//code to find min and max
}
}
ssc.start()
ssc.awaitTermination()
尝试:
valueDtsream.transform( rdd => {
val stats = rdd.flatMap(x => x).stats
rdd.sparkContext.parallelize(Seq((stats.min, stats.max)))
})