在 Spark Streaming 中将 RDD 打印到控制台

Print RDD out to console in spark streaming

我编写了一个 spark 流应用程序来使用 KafkaUtils 从 Kafka 接收数据,我想做的是打印出从 Kafka 接收到的数据。这是我的代码(我使用 spark-submit 来执行我的 spark streaming 作业):

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.print()

当我运行这个时,它工作得很好。如果在 Kafka 生产者中输入是 a,b,c,我可以从 Spark streaming 中得到如下结果:

Time: 1476481700000 ms

-------------------------------------------
(null,a)
(null,b)
(null,c)

但是如果我加一行来统计行数,messages.print()就不行了。代码如下:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.print()
messages.count().print()

我得到以下结果:

-------------------------------------------
Time: 1476481800000 ms
-------------------------------------------
4

只打印计数,无法打印数据。 我的问题是为什么我添加 messages.count.print().
messages.print() 不会被执行 另一个问题是 null 在元组 (null, a)(null, b)(null, c).

中代表什么

print() 没有问题,它将打印两条消息并进行计数,如下所示。滚动并检查您的日志。

-------------------------------------------
Time: 1476481700000 ms
-------------------------------------------
(null,a)
(null,b)
(null,c)

-------------------------------------------
Time: 1476481800000 ms
-------------------------------------------
4

KafkaUtils.createDirectStream 方法 returns <Kafka topic, Kafka message> 的 DStream。检查与主题相关的 and this post 是否为空。

您的代码应该可以工作,但给您一个 alternative.But 这种方法仅用于测试或学习。无需执行两个 actions ,您只需执行一个 action

即可实现最终目标
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
    //Cache your RDD before you perform any heavyweight operations. 
    messages.cache()
    val result = messages.collect();
    println(result.size + " size")
    result.foreach { input => println(input) }