start/stop 在 yarn 中激发流作业的正确方法是什么?
What is the correct way to start/stop spark streaming jobs in yarn?
我已经试验和谷歌搜索了很多小时,但没有成功。
我有一个 Spark 流应用程序,运行在本地 Spark 集群中运行良好。现在我需要在cloudera 5.4.4上部署它。我需要能够启动它,让它 运行 在后台持续运行,并且能够停止它。
我试过这个:
$ spark-submit --master yarn-cluster --class MyMain my.jar myArgs
但它只是不停地打印这些行。
15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
问题一:因为是流媒体应用,需要持续运行。那么如何在 "background" 模式下 运行 呢?我能找到的所有关于在 yarn 上提交 spark 作业的示例似乎都假设应用程序将执行一些工作并终止,因此您希望 运行 它在前台。但流式传输并非如此。
接下来...此时该应用程序似乎无法运行。我认为这可能是我的错误或配置错误,所以我尝试查看日志以查看发生了什么:
$ yarn logs -applicationId application_1438092860895_012
但它告诉我:
/tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.
那么问题2:如果应用程序是运行,为什么它没有日志文件?
所以最终我不得不杀死它:
$ yarn application -kill application_1438092860895_012
这带来了 问题号 3:假设我最终可以启动应用程序并 运行在后台运行,"yarn application -kill" 是首选方式停止它?
- 您可以关闭
spark-submit
控制台。当写出 运行 状态时,作业已经在后台 运行。
- 日志在应用程序完成后立即可见。在运行时期间,所有日志都可以在本地的工作节点上直接访问(您可以在 YARN 资源管理器网站 UI 上看到),并在作业完成后 聚合到 HDFS.
yarn application -kill
可能是停止 Spark 流应用程序的最佳方式,但它并不完美。最好做一些优雅关闭来停止所有流接收器并停止流上下文,但我个人不知道该怎么做。
- 您的数据来源是什么?如果是靠谱的,比如kafka direct receiver,yarn kill shutdown应该没问题。当您的应用程序重新启动时,它将从最后一个完整的批处理偏移量开始读取。如果数据源不可靠,或者如果您想自己处理正常关闭,则必须在流上下文中实现某种外部挂钩。我遇到了同样的问题,最后实施了一个小技巧,在 webui 中添加了一个新选项卡作为停止按钮。
我终于想出了一个安全关闭 spark streaming 作业的方法。
- 写一个套接字服务器线程等待停止流上下文
package xxx.xxx.xxx
import java.io.{BufferedReader, InputStreamReader}
import java.net.{ServerSocket, Socket}
import org.apache.spark.streaming.StreamingContext
object KillServer {
class NetworkService(port: Int, ssc: StreamingContext) extends Runnable {
val serverSocket = new ServerSocket(port)
def run() {
Thread.currentThread().setName("Zhuangdy | Waiting for graceful stop at port " + port)
while (true) {
val socket = serverSocket.accept()
(new Handler(socket, ssc)).run()
}
}
}
class Handler(socket: Socket, ssc: StreamingContext) extends Runnable {
def run() {
val reader = new InputStreamReader(socket.getInputStream)
val br = new BufferedReader(reader)
if (br.readLine() == "kill") {
ssc.stop(true, true)
}
br.close();
}
}
def run(port:Int, ssc: StreamingContext): Unit ={
(new NetworkService(port, ssc)).run
}
}
在开始流上下文的 main
方法中,添加以下代码
ssc.start()
KillServer.run(11212, ssc)
ssc.awaitTermination()
编写spark-submit将作业提交到yarn,直接输出到一个文件中,供以后使用
spark-submit --class "com.Mainclass" \
--conf "spark.streaming.stopGracefullyOnShutdown=true" \
--master yarn-cluster --queue "root" \
--deploy-mode cluster \
--executor-cores 4 --num-executors 8 --executor-memory 3G \
hdfs:///xxx.jar > output 2>&1 &
- 最后,安全关闭 spark streaming 作业,不会丢失数据或计算结果不会保留!!! (用于正常停止流上下文的服务器套接字在驱动程序上是运行,所以你grep步骤3的输出来获取驱动程序地址,并使用echo nc发送一个套接字kill命令)
#!/bin/bash
driver=`cat output | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'`
echo "kill" | nc $driver 11212
driverid=`yarn application -list 2>&1 | grep ad.Stat | grep -Po 'application_\d+_\d+'`
yarn application -kill $driverid
最后一个难题是如何以优雅的方式停止部署在 YARN 上的 Spark Streaming 应用程序。停止(或者说杀死)YARN 应用程序的标准方法是使用命令 yarn application -kill [applicationId]
。此命令会停止 Spark Streaming 应用程序,但这可能会在批处理过程中发生。因此,如果作业从 Kafka 读取数据,将处理结果保存在 HDFS 上并最终提交 Kafka 偏移量,那么当作业在提交偏移量之前停止时,您应该期望 HDFS 上有重复数据。
解决正常关机问题的第一个尝试是在关机挂钩中调用 Spark 流上下文停止方法。
sys.addShutdownHook {
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}
令人失望的是,调用关闭挂钩的时间太晚,无法完成启动的批处理,Spark 应用程序几乎立即被终止。此外,根本无法保证 JVM 会调用关闭挂钩。
在撰写此博客时post在 YARN 上正常关闭 Spark Streaming 应用程序的唯一确认方法是以某种方式通知应用程序计划关闭,然后以编程方式停止流上下文(但不是从关闭钩)。如果通知的应用程序在定义的超时后没有停止,则命令 yarn application -kill
只能作为最后的手段使用。
可以使用 HDFS 上的标记文件(最简单的方法)或使用在驱动程序上公开的简单 Socket/HTTP 端点(复杂的方法)通知应用程序计划关闭。
因为我喜欢KISS原理,你可以在下面找到shell使用标记文件启动/停止Spark Streaming应用程序的脚本伪代码:
start() {
hdfs dfs -touchz /path/to/marker/my_job_unique_name
spark-submit ...
}
stop() {
hdfs dfs -rm /path/to/marker/my_job_unique_name
force_kill=true
application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
for i in `seq 1 10`; do
application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
if [ -n "$application_status" ]; then
sleep 60s
else
force_kill=false
break
fi
done
$force_kill && yarn application -kill ${application_id}
}
在Spark Streaming应用中,后台线程要监听标记文件,当标记文件消失时停止上下文调用
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
也可以参考http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html
我已经试验和谷歌搜索了很多小时,但没有成功。
我有一个 Spark 流应用程序,运行在本地 Spark 集群中运行良好。现在我需要在cloudera 5.4.4上部署它。我需要能够启动它,让它 运行 在后台持续运行,并且能够停止它。
我试过这个:
$ spark-submit --master yarn-cluster --class MyMain my.jar myArgs
但它只是不停地打印这些行。
15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
问题一:因为是流媒体应用,需要持续运行。那么如何在 "background" 模式下 运行 呢?我能找到的所有关于在 yarn 上提交 spark 作业的示例似乎都假设应用程序将执行一些工作并终止,因此您希望 运行 它在前台。但流式传输并非如此。
接下来...此时该应用程序似乎无法运行。我认为这可能是我的错误或配置错误,所以我尝试查看日志以查看发生了什么:
$ yarn logs -applicationId application_1438092860895_012
但它告诉我:
/tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.
那么问题2:如果应用程序是运行,为什么它没有日志文件?
所以最终我不得不杀死它:
$ yarn application -kill application_1438092860895_012
这带来了 问题号 3:假设我最终可以启动应用程序并 运行在后台运行,"yarn application -kill" 是首选方式停止它?
- 您可以关闭
spark-submit
控制台。当写出 运行 状态时,作业已经在后台 运行。 - 日志在应用程序完成后立即可见。在运行时期间,所有日志都可以在本地的工作节点上直接访问(您可以在 YARN 资源管理器网站 UI 上看到),并在作业完成后 聚合到 HDFS.
yarn application -kill
可能是停止 Spark 流应用程序的最佳方式,但它并不完美。最好做一些优雅关闭来停止所有流接收器并停止流上下文,但我个人不知道该怎么做。
- 您的数据来源是什么?如果是靠谱的,比如kafka direct receiver,yarn kill shutdown应该没问题。当您的应用程序重新启动时,它将从最后一个完整的批处理偏移量开始读取。如果数据源不可靠,或者如果您想自己处理正常关闭,则必须在流上下文中实现某种外部挂钩。我遇到了同样的问题,最后实施了一个小技巧,在 webui 中添加了一个新选项卡作为停止按钮。
我终于想出了一个安全关闭 spark streaming 作业的方法。
- 写一个套接字服务器线程等待停止流上下文
package xxx.xxx.xxx import java.io.{BufferedReader, InputStreamReader} import java.net.{ServerSocket, Socket} import org.apache.spark.streaming.StreamingContext object KillServer { class NetworkService(port: Int, ssc: StreamingContext) extends Runnable { val serverSocket = new ServerSocket(port) def run() { Thread.currentThread().setName("Zhuangdy | Waiting for graceful stop at port " + port) while (true) { val socket = serverSocket.accept() (new Handler(socket, ssc)).run() } } } class Handler(socket: Socket, ssc: StreamingContext) extends Runnable { def run() { val reader = new InputStreamReader(socket.getInputStream) val br = new BufferedReader(reader) if (br.readLine() == "kill") { ssc.stop(true, true) } br.close(); } } def run(port:Int, ssc: StreamingContext): Unit ={ (new NetworkService(port, ssc)).run } }
在开始流上下文的
main
方法中,添加以下代码ssc.start() KillServer.run(11212, ssc) ssc.awaitTermination()
编写spark-submit将作业提交到yarn,直接输出到一个文件中,供以后使用
spark-submit --class "com.Mainclass" \ --conf "spark.streaming.stopGracefullyOnShutdown=true" \ --master yarn-cluster --queue "root" \ --deploy-mode cluster \ --executor-cores 4 --num-executors 8 --executor-memory 3G \ hdfs:///xxx.jar > output 2>&1 &
- 最后,安全关闭 spark streaming 作业,不会丢失数据或计算结果不会保留!!! (用于正常停止流上下文的服务器套接字在驱动程序上是运行,所以你grep步骤3的输出来获取驱动程序地址,并使用echo nc发送一个套接字kill命令)
#!/bin/bash driver=`cat output | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'` echo "kill" | nc $driver 11212 driverid=`yarn application -list 2>&1 | grep ad.Stat | grep -Po 'application_\d+_\d+'` yarn application -kill $driverid
最后一个难题是如何以优雅的方式停止部署在 YARN 上的 Spark Streaming 应用程序。停止(或者说杀死)YARN 应用程序的标准方法是使用命令 yarn application -kill [applicationId]
。此命令会停止 Spark Streaming 应用程序,但这可能会在批处理过程中发生。因此,如果作业从 Kafka 读取数据,将处理结果保存在 HDFS 上并最终提交 Kafka 偏移量,那么当作业在提交偏移量之前停止时,您应该期望 HDFS 上有重复数据。
解决正常关机问题的第一个尝试是在关机挂钩中调用 Spark 流上下文停止方法。
sys.addShutdownHook {
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}
令人失望的是,调用关闭挂钩的时间太晚,无法完成启动的批处理,Spark 应用程序几乎立即被终止。此外,根本无法保证 JVM 会调用关闭挂钩。
在撰写此博客时post在 YARN 上正常关闭 Spark Streaming 应用程序的唯一确认方法是以某种方式通知应用程序计划关闭,然后以编程方式停止流上下文(但不是从关闭钩)。如果通知的应用程序在定义的超时后没有停止,则命令 yarn application -kill
只能作为最后的手段使用。
可以使用 HDFS 上的标记文件(最简单的方法)或使用在驱动程序上公开的简单 Socket/HTTP 端点(复杂的方法)通知应用程序计划关闭。
因为我喜欢KISS原理,你可以在下面找到shell使用标记文件启动/停止Spark Streaming应用程序的脚本伪代码:
start() {
hdfs dfs -touchz /path/to/marker/my_job_unique_name
spark-submit ...
}
stop() {
hdfs dfs -rm /path/to/marker/my_job_unique_name
force_kill=true
application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
for i in `seq 1 10`; do
application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
if [ -n "$application_status" ]; then
sleep 60s
else
force_kill=false
break
fi
done
$force_kill && yarn application -kill ${application_id}
}
在Spark Streaming应用中,后台线程要监听标记文件,当标记文件消失时停止上下文调用
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
也可以参考http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html