在 dstream 驱动程序中从 RDD 收集结果

Collect results from RDDs in a dstream driver program

我在驱动程序中有这个函数,它从 rdds 收集结果到一个数组中并将它发送回去。然而,即使 RDD(在 dstream 中)有数据,该函数返回一个空数组...我做错了什么?

def runTopFunction() : Array[(String, Int)] = {
        val topSearches = some function....
        val summary = new ArrayBuffer[(String,Int)]()
        topSearches.foreachRDD(rdd => {
            summary = summary.++(rdd.collect())
        })    

    return summary.toArray
}

因此,虽然 foreachRDD 会执行您想要执行的操作,但它也是非阻塞的,这意味着它不会等到所有流都被处理完。由于您在调用 foreachRDD 后立即在缓冲区上调用 toArray,因此还没有处理任何元素。

DStream.forEachRDD 是对给定 DStream 的操作,将安排在每个流批处理间隔执行。这是稍后要执行的作业的声明式构造。

不支持以这种方式累加值,因为当 Dstream.forEachRDD 只是说 "do this on each iteration" 时,周围的累加代码会立即执行,导致一个空数组。

根据 summary 数据在计算后发生的情况,关于如何实现它的选项很少:

  • 如果数据需要由另一个进程检索,请使用共享的线程安全结构。优先级队列非常适合 top-k 使用。
  • 如果要存储数据(fs, db),将topSearches函数应用到dstream后,直接写入存储即可。