在 dstream 驱动程序中从 RDD 收集结果
Collect results from RDDs in a dstream driver program
我在驱动程序中有这个函数,它从 rdds 收集结果到一个数组中并将它发送回去。然而,即使 RDD(在 dstream 中)有数据,该函数返回一个空数组...我做错了什么?
def runTopFunction() : Array[(String, Int)] = {
val topSearches = some function....
val summary = new ArrayBuffer[(String,Int)]()
topSearches.foreachRDD(rdd => {
summary = summary.++(rdd.collect())
})
return summary.toArray
}
因此,虽然 foreachRDD
会执行您想要执行的操作,但它也是非阻塞的,这意味着它不会等到所有流都被处理完。由于您在调用 foreachRDD
后立即在缓冲区上调用 toArray
,因此还没有处理任何元素。
DStream.forEachRDD
是对给定 DStream
的操作,将安排在每个流批处理间隔执行。这是稍后要执行的作业的声明式构造。
不支持以这种方式累加值,因为当 Dstream.forEachRDD 只是说 "do this on each iteration" 时,周围的累加代码会立即执行,导致一个空数组。
根据 summary
数据在计算后发生的情况,关于如何实现它的选项很少:
- 如果数据需要由另一个进程检索,请使用共享的线程安全结构。优先级队列非常适合 top-k 使用。
- 如果要存储数据(fs, db),将
topSearches
函数应用到dstream后,直接写入存储即可。
我在驱动程序中有这个函数,它从 rdds 收集结果到一个数组中并将它发送回去。然而,即使 RDD(在 dstream 中)有数据,该函数返回一个空数组...我做错了什么?
def runTopFunction() : Array[(String, Int)] = {
val topSearches = some function....
val summary = new ArrayBuffer[(String,Int)]()
topSearches.foreachRDD(rdd => {
summary = summary.++(rdd.collect())
})
return summary.toArray
}
因此,虽然 foreachRDD
会执行您想要执行的操作,但它也是非阻塞的,这意味着它不会等到所有流都被处理完。由于您在调用 foreachRDD
后立即在缓冲区上调用 toArray
,因此还没有处理任何元素。
DStream.forEachRDD
是对给定 DStream
的操作,将安排在每个流批处理间隔执行。这是稍后要执行的作业的声明式构造。
不支持以这种方式累加值,因为当 Dstream.forEachRDD 只是说 "do this on each iteration" 时,周围的累加代码会立即执行,导致一个空数组。
根据 summary
数据在计算后发生的情况,关于如何实现它的选项很少:
- 如果数据需要由另一个进程检索,请使用共享的线程安全结构。优先级队列非常适合 top-k 使用。
- 如果要存储数据(fs, db),将
topSearches
函数应用到dstream后,直接写入存储即可。