在 Spark Streaming 中将 RDD 打印到控制台
Print RDD out to console in spark streaming
我编写了一个 spark 流应用程序来使用 KafkaUtils 从 Kafka 接收数据,我想做的是打印出从 Kafka 接收到的数据。这是我的代码(我使用 spark-submit 来执行我的 spark streaming 作业):
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.print()
当我运行这个时,它工作得很好。如果在 Kafka 生产者中输入是 a,b,c,我可以从 Spark streaming 中得到如下结果:
Time: 1476481700000 ms
-------------------------------------------
(null,a)
(null,b)
(null,c)
但是如果我加一行来统计行数,messages.print()
就不行了。代码如下:
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.print()
messages.count().print()
我得到以下结果:
-------------------------------------------
Time: 1476481800000 ms
-------------------------------------------
4
只打印计数,无法打印数据。
我的问题是为什么我添加 messages.count.print()
.
后 messages.print()
不会被执行
另一个问题是 null 在元组 (null, a)(null, b)(null, c)
.
中代表什么
print() 没有问题,它将打印两条消息并进行计数,如下所示。滚动并检查您的日志。
-------------------------------------------
Time: 1476481700000 ms
-------------------------------------------
(null,a)
(null,b)
(null,c)
-------------------------------------------
Time: 1476481800000 ms
-------------------------------------------
4
KafkaUtils.createDirectStream 方法 returns <Kafka topic, Kafka message>
的 DStream。检查与主题相关的 and this post 是否为空。
您的代码应该可以工作,但给您一个 alternative.But 这种方法仅用于测试或学习。无需执行两个 actions
,您只需执行一个 action
即可实现最终目标
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
//Cache your RDD before you perform any heavyweight operations.
messages.cache()
val result = messages.collect();
println(result.size + " size")
result.foreach { input => println(input) }
我编写了一个 spark 流应用程序来使用 KafkaUtils 从 Kafka 接收数据,我想做的是打印出从 Kafka 接收到的数据。这是我的代码(我使用 spark-submit 来执行我的 spark streaming 作业):
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.print()
当我运行这个时,它工作得很好。如果在 Kafka 生产者中输入是 a,b,c,我可以从 Spark streaming 中得到如下结果:
Time: 1476481700000 ms
-------------------------------------------
(null,a)
(null,b)
(null,c)
但是如果我加一行来统计行数,messages.print()
就不行了。代码如下:
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.print()
messages.count().print()
我得到以下结果:
-------------------------------------------
Time: 1476481800000 ms
-------------------------------------------
4
只打印计数,无法打印数据。
我的问题是为什么我添加 messages.count.print()
.
后 messages.print()
不会被执行
另一个问题是 null 在元组 (null, a)(null, b)(null, c)
.
print() 没有问题,它将打印两条消息并进行计数,如下所示。滚动并检查您的日志。
-------------------------------------------
Time: 1476481700000 ms
-------------------------------------------
(null,a)
(null,b)
(null,c)
-------------------------------------------
Time: 1476481800000 ms
-------------------------------------------
4
KafkaUtils.createDirectStream 方法 returns <Kafka topic, Kafka message>
的 DStream。检查与主题相关的
您的代码应该可以工作,但给您一个 alternative.But 这种方法仅用于测试或学习。无需执行两个 actions
,您只需执行一个 action
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
//Cache your RDD before you perform any heavyweight operations.
messages.cache()
val result = messages.collect();
println(result.size + " size")
result.foreach { input => println(input) }