如何将多个RDD的DStream转换为单个RDD
How to convert DStream of number of RDDs to Single RDD
基本上我使用单个 Spark Streaming 消费者[直接方法]从多个 kafka 主题消费数据。
val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
批次间隔为 30 Seconds
。
我有几个问题。
- 当我在 DStream 上调用 foreachRDD 时,DStream 会包含多个 RDD 而不是单个 RDD 吗?每个主题会创建单独的 RDD 吗??
- 如果是,我想将所有的RDD合并为一个RDD,然后处理数据。我该怎么做?
- 如果我的处理时间超过batch interval,DStream是否会包含多个RDD?
我尝试使用以下方式将 DStream RDD 联合到单个 RDD。首先我的理解正确吗?如果DStream总是returns单个RDD,那么下面的代码就没有必要了。
示例代码:
var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
{
dStreamRDDList += rdd
})
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache()
//THEN PROCESS USING joinedRDD
//Convert joinedRDD to DF, then apply aggregate operations using DF API.
Will the DStream contains multiple RDD's instead of Single RDD when i call foreachRDD on DStream? will each topic create separate RDD?
没有。即使您有多个主题,在任何给定的批处理间隔内您也将拥有一个 RDD。
If my processing time is more than batch interval, will the DStream contain more than one RDDs?
不,如果您的处理时间比批处理时间长,那么所有要做的就是读取主题偏移量。只有在上一个作业完成后,才会开始处理下一批。
作为旁注,请确保您确实需要使用 foreachRDD
,或者如果可能 you're misusing the DStream API(免责声明:我是 post 的作者)
基本上我使用单个 Spark Streaming 消费者[直接方法]从多个 kafka 主题消费数据。
val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
批次间隔为 30 Seconds
。
我有几个问题。
- 当我在 DStream 上调用 foreachRDD 时,DStream 会包含多个 RDD 而不是单个 RDD 吗?每个主题会创建单独的 RDD 吗??
- 如果是,我想将所有的RDD合并为一个RDD,然后处理数据。我该怎么做?
- 如果我的处理时间超过batch interval,DStream是否会包含多个RDD?
我尝试使用以下方式将 DStream RDD 联合到单个 RDD。首先我的理解正确吗?如果DStream总是returns单个RDD,那么下面的代码就没有必要了。
示例代码:
var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
{
dStreamRDDList += rdd
})
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache()
//THEN PROCESS USING joinedRDD
//Convert joinedRDD to DF, then apply aggregate operations using DF API.
Will the DStream contains multiple RDD's instead of Single RDD when i call foreachRDD on DStream? will each topic create separate RDD?
没有。即使您有多个主题,在任何给定的批处理间隔内您也将拥有一个 RDD。
If my processing time is more than batch interval, will the DStream contain more than one RDDs?
不,如果您的处理时间比批处理时间长,那么所有要做的就是读取主题偏移量。只有在上一个作业完成后,才会开始处理下一批。
作为旁注,请确保您确实需要使用 foreachRDD
,或者如果可能 you're misusing the DStream API(免责声明:我是 post 的作者)