Spark Launcher 无限等待作业完成
Spark Launcher waiting for job completion infinitely
我正在尝试通过 Java 代码将带有 Spark 作业的 JAR 提交到 YARN 集群中。我正在使用 SparkLauncher 提交 SparkPi 示例:
Process spark = new SparkLauncher()
.setAppResource("C:\spark-1.4.1-bin-hadoop2.6\lib\spark-examples-1.4.1-hadoop2.6.0.jar")
.setMainClass("org.apache.spark.examples.SparkPi")
.setMaster("yarn-cluster")
.launch();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
有两个问题:
- 在 "yarn-cluster" 模式下提交时,应用程序成功提交到 YARN 并成功执行(在 YARN UI 中可见,报告为 SUCCESS 并且 pi 打印在输出中)。但是,提交应用程序永远不会收到处理完成的通知 - 它在打印后无限挂起 "Waiting to finish..." 可以找到容器的日志 here
- 在 "yarn-client" 模式下提交时,应用程序不会出现在 YARN UI 并且提交应用程序挂在 "Waiting to finish..." 当挂起代码被杀死时,应用程序会出现在 YARN 中UI 并报告为 SUCCESS,但输出为空(未打印出 pi)。可以找到容器的日志here
我尝试使用 Oracle Java 7 和 8 执行提交应用程序。
我在 Spark 邮件列表中得到了帮助。关键是读取/清除 Process 上的 getInputStream 和 getErrorStream()。子进程可能会填满缓冲区并导致死锁 - 请参阅 Oracle docs regarding Process。应在单独的线程中读取流:
Process spark = new SparkLauncher()
.setSparkHome("C:\spark-1.4.1-bin-hadoop2.6")
.setAppResource("C:\spark-1.4.1-bin-hadoop2.6\lib\spark-examples-1.4.1-hadoop2.6.0.jar")
.setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch();
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
其中 InputStreamReaderRunnable class 是:
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream " + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
由于这是旧的 post,我想添加一个更新,可能对以后阅读此 post 的人有所帮助。在 spark 1.6.0 中,SparkLauncher class 增加了一些功能。即:
def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher
您可以 运行 应用程序而不需要额外的线程来处理 stdout 和 stderr 有一个很好的应用程序状态报告 运行ning。使用此代码:
val env = Map(
"HADOOP_CONF_DIR" -> hadoopConfDir,
"YARN_CONF_DIR" -> yarnConfDir
)
val handler = new SparkLauncher(env.asJava)
.setSparkHome(sparkHome)
.setAppResource("Jar/location/.jar")
.setMainClass("path.to.the.main.class")
.setMaster("yarn-client")
.setConf("spark.app.id", "AppID if you have one")
.setConf("spark.driver.memory", "8g")
.setConf("spark.akka.frameSize", "200")
.setConf("spark.executor.memory", "2g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "32")
.setConf("spark.default.parallelism", "100")
.setConf("spark.driver.allowMultipleContexts","true")
.setVerbose(true)
.startApplication()
println(handle.getAppId)
println(handle.getState)
你可以一直查询spark应用的状态,直到成功。
有关 Spark Launcher 服务器如何在 1.6.0 中工作的信息。看到这个link:
https://github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
我使用 CountDownLatch 实现,它按预期工作。
这是针对 SparkLauncher 2.0.1 版的,它也适用于 Yarn-cluster 模式。
...
final CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
Thread sparkAppListenerThread = new Thread(sparkAppListener);
sparkAppListenerThread.start();
long timeout = 120;
countDownLatch.await(timeout, TimeUnit.SECONDS);
...
private static class SparkAppListener implements SparkAppHandle.Listener, Runnable {
private static final Log log = LogFactory.getLog(SparkAppListener.class);
private final CountDownLatch countDownLatch;
public SparkAppListener(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void stateChanged(SparkAppHandle handle) {
String sparkAppId = handle.getAppId();
State appState = handle.getState();
if (sparkAppId != null) {
log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - "
+ SPARK_STATE_MSG.get(appState));
} else {
log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState));
}
if (appState != null && appState.isFinal()) {
countDownLatch.countDown();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {}
@Override
public void run() {}
}
我正在尝试通过 Java 代码将带有 Spark 作业的 JAR 提交到 YARN 集群中。我正在使用 SparkLauncher 提交 SparkPi 示例:
Process spark = new SparkLauncher()
.setAppResource("C:\spark-1.4.1-bin-hadoop2.6\lib\spark-examples-1.4.1-hadoop2.6.0.jar")
.setMainClass("org.apache.spark.examples.SparkPi")
.setMaster("yarn-cluster")
.launch();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
有两个问题:
- 在 "yarn-cluster" 模式下提交时,应用程序成功提交到 YARN 并成功执行(在 YARN UI 中可见,报告为 SUCCESS 并且 pi 打印在输出中)。但是,提交应用程序永远不会收到处理完成的通知 - 它在打印后无限挂起 "Waiting to finish..." 可以找到容器的日志 here
- 在 "yarn-client" 模式下提交时,应用程序不会出现在 YARN UI 并且提交应用程序挂在 "Waiting to finish..." 当挂起代码被杀死时,应用程序会出现在 YARN 中UI 并报告为 SUCCESS,但输出为空(未打印出 pi)。可以找到容器的日志here
我尝试使用 Oracle Java 7 和 8 执行提交应用程序。
我在 Spark 邮件列表中得到了帮助。关键是读取/清除 Process 上的 getInputStream 和 getErrorStream()。子进程可能会填满缓冲区并导致死锁 - 请参阅 Oracle docs regarding Process。应在单独的线程中读取流:
Process spark = new SparkLauncher()
.setSparkHome("C:\spark-1.4.1-bin-hadoop2.6")
.setAppResource("C:\spark-1.4.1-bin-hadoop2.6\lib\spark-examples-1.4.1-hadoop2.6.0.jar")
.setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch();
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
其中 InputStreamReaderRunnable class 是:
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream " + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
由于这是旧的 post,我想添加一个更新,可能对以后阅读此 post 的人有所帮助。在 spark 1.6.0 中,SparkLauncher class 增加了一些功能。即:
def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher
您可以 运行 应用程序而不需要额外的线程来处理 stdout 和 stderr 有一个很好的应用程序状态报告 运行ning。使用此代码:
val env = Map(
"HADOOP_CONF_DIR" -> hadoopConfDir,
"YARN_CONF_DIR" -> yarnConfDir
)
val handler = new SparkLauncher(env.asJava)
.setSparkHome(sparkHome)
.setAppResource("Jar/location/.jar")
.setMainClass("path.to.the.main.class")
.setMaster("yarn-client")
.setConf("spark.app.id", "AppID if you have one")
.setConf("spark.driver.memory", "8g")
.setConf("spark.akka.frameSize", "200")
.setConf("spark.executor.memory", "2g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "32")
.setConf("spark.default.parallelism", "100")
.setConf("spark.driver.allowMultipleContexts","true")
.setVerbose(true)
.startApplication()
println(handle.getAppId)
println(handle.getState)
你可以一直查询spark应用的状态,直到成功。 有关 Spark Launcher 服务器如何在 1.6.0 中工作的信息。看到这个link: https://github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
我使用 CountDownLatch 实现,它按预期工作。 这是针对 SparkLauncher 2.0.1 版的,它也适用于 Yarn-cluster 模式。
...
final CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
Thread sparkAppListenerThread = new Thread(sparkAppListener);
sparkAppListenerThread.start();
long timeout = 120;
countDownLatch.await(timeout, TimeUnit.SECONDS);
...
private static class SparkAppListener implements SparkAppHandle.Listener, Runnable {
private static final Log log = LogFactory.getLog(SparkAppListener.class);
private final CountDownLatch countDownLatch;
public SparkAppListener(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void stateChanged(SparkAppHandle handle) {
String sparkAppId = handle.getAppId();
State appState = handle.getState();
if (sparkAppId != null) {
log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - "
+ SPARK_STATE_MSG.get(appState));
} else {
log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState));
}
if (appState != null && appState.isFinal()) {
countDownLatch.countDown();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {}
@Override
public void run() {}
}