Spark streaming if(!rdd.partitions.isEmpty) 不工作
Spark streaming if(!rdd.partitions.isEmpty) not working
我正在尝试从 kafka 服务器创建一个 dStream,然后对该流进行一些转换。如果流是空的 (if(!rdd.partitions.isEmpty)
),我已经包含了一个 catch;但是,即使没有事件发布到 kafka 主题,也永远不会到达 else
语句。
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if(!rdd.partitions.isEmpty) {
val message = rdd.map((x) => x._2).collect().toList.map(parser)
val val = message(0)
} else println("empty stream...")
ssc.start()
ssc.awaitTermination()
}
在使用 KafkaUtils.createDirectStream
而不是 createStream
时,我是否应该使用替代语句来检查流是否为空?
使用 RDD.isEmpty
而不是 RDD.partitions.isEmpty
,这会添加一个检查以查看底层分区是否确实包含元素:
stream.foreachRDD { rdd =>
if(!rdd.isEmpty) {
// Stuff
}
}
RDD.partitions.isEmpty
不起作用的原因是 RDD
中存在一个分区,但该分区本身是空的。但从 partitions
的角度来看,它是一个 Array[Partition]
,它不是空的。
我正在尝试从 kafka 服务器创建一个 dStream,然后对该流进行一些转换。如果流是空的 (if(!rdd.partitions.isEmpty)
),我已经包含了一个 catch;但是,即使没有事件发布到 kafka 主题,也永远不会到达 else
语句。
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if(!rdd.partitions.isEmpty) {
val message = rdd.map((x) => x._2).collect().toList.map(parser)
val val = message(0)
} else println("empty stream...")
ssc.start()
ssc.awaitTermination()
}
在使用 KafkaUtils.createDirectStream
而不是 createStream
时,我是否应该使用替代语句来检查流是否为空?
使用 RDD.isEmpty
而不是 RDD.partitions.isEmpty
,这会添加一个检查以查看底层分区是否确实包含元素:
stream.foreachRDD { rdd =>
if(!rdd.isEmpty) {
// Stuff
}
}
RDD.partitions.isEmpty
不起作用的原因是 RDD
中存在一个分区,但该分区本身是空的。但从 partitions
的角度来看,它是一个 Array[Partition]
,它不是空的。