从 YARN 上的另一个程序启动的 Flink 作业失败并显示 "JobClientActor seems to have died"
Flink job started from another program on YARN fails with "JobClientActor seems to have died"
我是 flink 新用户,遇到以下问题。
我在 YARN 集群上使用 flink 将从 RDBMS 中提取的相关数据传输到 HBase。
我在具有多个 ExecutionEnvironments 的 java 上编写 flink 批处理应用程序(每个 RDB table 一个以并行传输 table 行)以通过 table 顺序传输 table (因为env.execute() 的调用正在阻塞)。
我这样开始 YARN 会话
export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/yarn-session.sh -n 1 -s 4 -d -jm 2048 -tm 8096
然后我 运行 我在 YARN 会话上的应用程序通过 shell 脚本 transfer.sh 启动。它的内容在这里
#!/bin/bash
export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/flink run -p 4 transfer.jar
当我从命令行手动启动此脚本时,它工作正常 - 作业被一个接一个地提交到 YARN 会话,没有错误。
现在我应该可以从另一个 java 程序 运行 这个脚本了。
为此,我使用
Runtime.exec("transfer.sh");
(也许有更好的方法来做到这一点?我在 REST API 看到过,但有一些困难,因为作业管理器由 YARN 代理)。
一开始是像往常一样工作 - 首先将几个作业提交给会话并成功完成。但以下作业未提交到 YARN 会话。
在 /opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log 中我看到错误(在 DEBUG 级别没有发现其他错误)
The program execution failed: JobClientActor seems to have died before the JobExecutionResult could be retrieved.
我试图自己分析这个问题,发现这个错误发生在 JobClient class 向 JobClientActor(即 YARN 集群)发送超时的 ping 请求时。
我尝试增加多个心跳和超时选项,例如 akka.*.timeout、akka.watch.heartbeat.* 和 yarn.heartbeat-delay 选项,但它没有解决问题 - 新作业不会从 CliFrontend 提交到 YARN 会话。
两种情况(手动调用和从另一个程序调用)的环境是相同的。当我打电话给
$ ps axu | grep transfer
它会给我输出
/usr/lib/jvm/java-8-oracle/bin/java -Dlog.file=/opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log -Dlog4j.configuration=file:/opt/flink-1.3.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/opt/flink-1.3.1/conf/logback.xml -classpath /opt/flink-1.3.1/lib/flink-metrics-graphite-1.3.1.jar:/opt/flink-1.3.1/lib/flink-python_2.11-1.3.1.jar:/opt/flink-1.3.1/lib/flink-shaded-hadoop2-uber-1.3.1.jar:/opt/flink-1.3.1/lib/log4j-1.2.17.jar:/opt/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.3.1/lib/flink-dist_2.11-1.3.1.jar:::/etc/hadoop/conf org.apache.flink.client.CliFrontend run -p 4 transfer.jar
我也试过更新flink到1.4.0版本或者改变job的并行度(甚至是-p 1)但是还是报错
我不知道会有什么不同?顺便说一句,有什么解决方法吗?
感谢您的帮助。
我终于找到了解决该错误的方法
只需将 Runtime.exec(...)
替换为 new ProcessBuilder(...).inheritIO().start()
.
我真的不知道为什么 inheritIO
的调用在这种情况下有帮助,因为据我所知,它只是将 IO 流从子进程重定向到父进程。
但是我检查过,如果我注释掉这行代码,程序又开始下降了。
我是 flink 新用户,遇到以下问题。 我在 YARN 集群上使用 flink 将从 RDBMS 中提取的相关数据传输到 HBase。 我在具有多个 ExecutionEnvironments 的 java 上编写 flink 批处理应用程序(每个 RDB table 一个以并行传输 table 行)以通过 table 顺序传输 table (因为env.execute() 的调用正在阻塞)。
我这样开始 YARN 会话
export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/yarn-session.sh -n 1 -s 4 -d -jm 2048 -tm 8096
然后我 运行 我在 YARN 会话上的应用程序通过 shell 脚本 transfer.sh 启动。它的内容在这里
#!/bin/bash
export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/flink run -p 4 transfer.jar
当我从命令行手动启动此脚本时,它工作正常 - 作业被一个接一个地提交到 YARN 会话,没有错误。
现在我应该可以从另一个 java 程序 运行 这个脚本了。 为此,我使用
Runtime.exec("transfer.sh");
(也许有更好的方法来做到这一点?我在 REST API 看到过,但有一些困难,因为作业管理器由 YARN 代理)。 一开始是像往常一样工作 - 首先将几个作业提交给会话并成功完成。但以下作业未提交到 YARN 会话。 在 /opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log 中我看到错误(在 DEBUG 级别没有发现其他错误)
The program execution failed: JobClientActor seems to have died before the JobExecutionResult could be retrieved.
我试图自己分析这个问题,发现这个错误发生在 JobClient class 向 JobClientActor(即 YARN 集群)发送超时的 ping 请求时。 我尝试增加多个心跳和超时选项,例如 akka.*.timeout、akka.watch.heartbeat.* 和 yarn.heartbeat-delay 选项,但它没有解决问题 - 新作业不会从 CliFrontend 提交到 YARN 会话。
两种情况(手动调用和从另一个程序调用)的环境是相同的。当我打电话给
$ ps axu | grep transfer
它会给我输出
/usr/lib/jvm/java-8-oracle/bin/java -Dlog.file=/opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log -Dlog4j.configuration=file:/opt/flink-1.3.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/opt/flink-1.3.1/conf/logback.xml -classpath /opt/flink-1.3.1/lib/flink-metrics-graphite-1.3.1.jar:/opt/flink-1.3.1/lib/flink-python_2.11-1.3.1.jar:/opt/flink-1.3.1/lib/flink-shaded-hadoop2-uber-1.3.1.jar:/opt/flink-1.3.1/lib/log4j-1.2.17.jar:/opt/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.3.1/lib/flink-dist_2.11-1.3.1.jar:::/etc/hadoop/conf org.apache.flink.client.CliFrontend run -p 4 transfer.jar
我也试过更新flink到1.4.0版本或者改变job的并行度(甚至是-p 1)但是还是报错
我不知道会有什么不同?顺便说一句,有什么解决方法吗?
感谢您的帮助。
我终于找到了解决该错误的方法
只需将 Runtime.exec(...)
替换为 new ProcessBuilder(...).inheritIO().start()
.
我真的不知道为什么 inheritIO
的调用在这种情况下有帮助,因为据我所知,它只是将 IO 流从子进程重定向到父进程。
但是我检查过,如果我注释掉这行代码,程序又开始下降了。