kafka directstream dstream 地图不打印
kafka directstream dstream map does not print
我有这个简单的 Kafka Stream
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Each Kafka message is a flight
val flights = messages.map(_._2)
flights.foreachRDD( rdd => {
println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records");
rdd.map { flight => {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
}
})
ssc.start()
ssc.awaitTermination()
Kafka 有消息,Spark Streaming 可以将它们作为 RDD。但是我代码中的第二个 println 没有打印任何东西。我在本地 [2] 模式下 运行 时查看驱动程序控制台日志,在 yarn-client 模式下 运行 时检查纱线日志。
我错过了什么?
而不是 rdd.map,以下代码在 spark 驱动程序控制台中打印良好:
for(flight <- rdd.collect().toArray) {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
但我担心这个飞行对象的处理可能发生在 spark 驱动程序项目中,而不是执行程序中。如有不妥请指正
谢谢
rdd.map
是惰性变换。除非对该 RDD 调用操作,否则它不会具体化。
在这种特定情况下,我们可以使用 rdd.foreach
,这是 RDD 上最通用的操作之一,让我们可以访问 RDD 中的每个元素。
flights.foreachRDD{ rdd =>
rdd.foreach { flight =>
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows) // prints on the stdout of each executor independently
}
}
鉴于此RDD动作是在执行器中执行的,我们将在执行器的STDOUT中找到println输出。
如果您想在驱动程序上打印数据,您可以在 DStream.foreachRDD
闭包中 collect
RDD 的数据。
flights.foreachRDD{ rdd =>
val allFlights = rdd.collect()
println(allFlights.mkString("\n")) // prints to the stdout of the driver
}
我有这个简单的 Kafka Stream
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Each Kafka message is a flight
val flights = messages.map(_._2)
flights.foreachRDD( rdd => {
println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records");
rdd.map { flight => {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
}
})
ssc.start()
ssc.awaitTermination()
Kafka 有消息,Spark Streaming 可以将它们作为 RDD。但是我代码中的第二个 println 没有打印任何东西。我在本地 [2] 模式下 运行 时查看驱动程序控制台日志,在 yarn-client 模式下 运行 时检查纱线日志。
我错过了什么?
而不是 rdd.map,以下代码在 spark 驱动程序控制台中打印良好:
for(flight <- rdd.collect().toArray) {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
但我担心这个飞行对象的处理可能发生在 spark 驱动程序项目中,而不是执行程序中。如有不妥请指正
谢谢
rdd.map
是惰性变换。除非对该 RDD 调用操作,否则它不会具体化。
在这种特定情况下,我们可以使用 rdd.foreach
,这是 RDD 上最通用的操作之一,让我们可以访问 RDD 中的每个元素。
flights.foreachRDD{ rdd =>
rdd.foreach { flight =>
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows) // prints on the stdout of each executor independently
}
}
鉴于此RDD动作是在执行器中执行的,我们将在执行器的STDOUT中找到println输出。
如果您想在驱动程序上打印数据,您可以在 DStream.foreachRDD
闭包中 collect
RDD 的数据。
flights.foreachRDD{ rdd =>
val allFlights = rdd.collect()
println(allFlights.mkString("\n")) // prints to the stdout of the driver
}