大约 1 分钟后,Spark Streaming 停止且没有错误
Spark Streaming gets stopped without errors after ~1 minute
当我为 Spart Streaming 作业制作 spark-submit
时,我可以看到它在大约 1 分钟内处于 运行ning,然后它以最终状态 SUCCEEDED
停止:
16/11/16 18:58:16 INFO yarn.Client: Application report for application_XXXX_XXX (state: RUNNING)
16/11/16 18:58:17 INFO yarn.Client: Application report for application_XXXX_XXX (state: FINISHED)
我不明白为什么它会停止,而我希望它在未定义的时间内 运行 并由从 Kafka 队列接收到的消息触发。在日志中,我可以看到所有 println
输出,并且没有错误。
这是代码的简短摘录:
val conf = new SparkConf().setAppName("MYTEST")
val sc = new SparkContext(conf)
sc.setCheckpointDir("~/checkpointDir")
val ssc = new StreamingContext(sc, Seconds(batch_interval_seconds))
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
println("Dividing the topic into partitions.")
val inputKafkaTopicMap = inputKafkaTopic.split(",").map((_, kafkaNumThreads)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, inputKafkaTopicMap).map(_._2)
messages.foreachRDD(msg => {
msg.foreach(s => {
if (s != null) {
//val result = ... processing goes here
//println(result)
}
})
})
// Start the streaming context in the background.
ssc.start()
这是我的 spark-submit
命令:
/usr/bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC \
-XX:+AlwaysPreTouch" --class org.test.StreamingRunner test.jar param1 param2
当我打开资源管理器时,我看到没有作业是 RUNNING
并且 Spark Streaming 作业被标记为 FINISHED
。
您的代码缺少对 ssc.awaitTermination
的调用以阻止驱动程序线程。
不幸的是,没有简单的方法可以在控制台上查看 map
函数内部的打印输出,因为这些函数调用发生在 YARN 执行器内部。 Cloudera Manager 提供了一个不错的日志查看方式,如果您真的需要在驱动程序上收集它们,您可以写入 HDFS 中的某个位置,然后自己从那里抓取各种日志。如果您要跟踪的信息是纯数字的,您还可以考虑使用 Accumulator.
当我为 Spart Streaming 作业制作 spark-submit
时,我可以看到它在大约 1 分钟内处于 运行ning,然后它以最终状态 SUCCEEDED
停止:
16/11/16 18:58:16 INFO yarn.Client: Application report for application_XXXX_XXX (state: RUNNING)
16/11/16 18:58:17 INFO yarn.Client: Application report for application_XXXX_XXX (state: FINISHED)
我不明白为什么它会停止,而我希望它在未定义的时间内 运行 并由从 Kafka 队列接收到的消息触发。在日志中,我可以看到所有 println
输出,并且没有错误。
这是代码的简短摘录:
val conf = new SparkConf().setAppName("MYTEST")
val sc = new SparkContext(conf)
sc.setCheckpointDir("~/checkpointDir")
val ssc = new StreamingContext(sc, Seconds(batch_interval_seconds))
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
println("Dividing the topic into partitions.")
val inputKafkaTopicMap = inputKafkaTopic.split(",").map((_, kafkaNumThreads)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, inputKafkaTopicMap).map(_._2)
messages.foreachRDD(msg => {
msg.foreach(s => {
if (s != null) {
//val result = ... processing goes here
//println(result)
}
})
})
// Start the streaming context in the background.
ssc.start()
这是我的 spark-submit
命令:
/usr/bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC \
-XX:+AlwaysPreTouch" --class org.test.StreamingRunner test.jar param1 param2
当我打开资源管理器时,我看到没有作业是 RUNNING
并且 Spark Streaming 作业被标记为 FINISHED
。
您的代码缺少对 ssc.awaitTermination
的调用以阻止驱动程序线程。
不幸的是,没有简单的方法可以在控制台上查看 map
函数内部的打印输出,因为这些函数调用发生在 YARN 执行器内部。 Cloudera Manager 提供了一个不错的日志查看方式,如果您真的需要在驱动程序上收集它们,您可以写入 HDFS 中的某个位置,然后自己从那里抓取各种日志。如果您要跟踪的信息是纯数字的,您还可以考虑使用 Accumulator.