如何将多个RDD的DStream转换为单个RDD

How to convert DStream of number of RDDs to Single RDD

基本上我使用单个 Spark Streaming 消费者[直接方法]从多个 kafka 主题消费数据。

val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)

批次间隔为 30 Seconds

我有几个问题。

  1. 当我在 DStream 上调用 foreachRDD 时,DStream 会包含多个 RDD 而不是单个 RDD 吗?每个主题会创建单独的 RDD 吗??
  2. 如果是,我想将所有的RDD合并为一个RDD,然后处理数据。我该怎么做?
  3. 如果我的处理时间超过batch interval,DStream是否会包含多个RDD?

我尝试使用以下方式将 DStream RDD 联合到单个 RDD。首先我的理解正确吗?如果DStream总是returns单个RDD,那么下面的代码就没有必要了。

示例代码:

var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
        {
            dStreamRDDList += rdd
        })
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache()

//THEN PROCESS USING joinedRDD
//Convert joinedRDD to DF, then apply aggregate operations using DF API.

Will the DStream contains multiple RDD's instead of Single RDD when i call foreachRDD on DStream? will each topic create separate RDD?

没有。即使您有多个主题,在任何给定的批处理间隔内您也将拥有一个 RDD。

If my processing time is more than batch interval, will the DStream contain more than one RDDs?

不,如果您的处理时间比批处理时间长,那么所有要做的就是读取主题偏移量。只有在上一个作业完成后,才会开始处理下一批。

作为旁注,请确保您确实需要使用 foreachRDD,或者如果可能 you're misusing the DStream API(免责声明:我是 post 的作者)