运行 Java 通过 flink yarn 集群上的 maven 包含配置的 Jar
Running Java Jar with included config via maven on flink yarn cluster
我在 maven/java 项目中使用 flink,需要在创建的 jar 内部包含我的配置。
所以,我在我的 pom 文件中添加了以下内容。这包括我在 jar 中的所有 yml 配置(位于 src/main/resources 文件夹中),我将在执行时将其名称作为参数传递。
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.yml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<finalName>${project.artifactId}-${project.version}</finalName>
<shadedArtifactAttached>true</shadedArtifactAttached>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.exmaple.MyApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
以下主要 class 代码接收一个参数,我根据该参数决定从资源中选择什么配置,读取(使用 snakeyaml)并使用。
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
ClassLoader classLoader = MyApplication.class.getClassLoader();
Yaml yaml = new Yaml();
String filename = parameterTool.getRequired("configFilename");
InputStream in = classLoader.getSystemResourceAsStream(filename);
MyConfigClass = yaml.loadAs(in, MyConfigClass.class);
...
}
mvn 全新安装创建 "my-shaded-jar.jar"
我使用命令
执行
java -jar /path/to/my-shaded-jar.jar --configFilename filename
如果我与其他人共享 jar,它可以在多个系统上运行。
但是,当我尝试使用以下命令 运行 Hadoop 上的 yarn 集群中的同一个 jar 时,我遇到了问题:-
HADOOP_CLASSPATH=`hadoop classpath` HADOOP_CONF_DIR=/etc/hadoop/conf ./flink-1.6.2/bin/flink run -m yarn-cluster -yd -yn 5 -ys 30 -yjm 10240 -ytm 10240 -yst -ynm some-job-name -yqu queue-name ./my-shaded-jar.jar --configFilename filename
我收到以下错误:
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:1129)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: org.yaml.snakeyaml.error.YAMLException: java.io.IOException: Stream closed
at org.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:200)
at org.yaml.snakeyaml.reader.StreamReader.<init>(StreamReader.java:60)
at org.yaml.snakeyaml.Yaml.loadAs(Yaml.java:444)
at com.example.MyApplication.main(MyApplication.java:53)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 13 more
Caused by: java.io.IOException: Stream closed
at java.io.PushbackInputStream.ensureOpen(PushbackInputStream.java:74)
at java.io.PushbackInputStream.read(PushbackInputStream.java:166)
at org.yaml.snakeyaml.reader.UnicodeReader.init(UnicodeReader.java:90)
at org.yaml.snakeyaml.reader.UnicodeReader.read(UnicodeReader.java:122)
at java.io.Reader.read(Reader.java:140)
at org.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:184)
为什么我的解决方案适用于任何正常的 linux/mac 系统,但是当在 yarn 集群上执行 运行 flink 运行 命令时,具有相同参数的同一个 jar 会失败。
我们一般执行jar的方式和yarn执行的方式有区别吗
感谢任何帮助。
将 classLoader.getSystemResourceAsStream(filename)
替换为 classLoader.getResourceAsStream(filename)
。
java.lang.ClassLoader#getSystemResourceAsStream
通过系统class加载器定位资源,通常用于启动应用程序。
java.lang.ClassLoader#getResourceAsStream
将首先搜索父 class 加载器。如果失败,它将调用当前 class 加载程序的 findResource
。
为了避免依赖冲突,class将Flink应用中的es划分为两个域[1],同样适用于Flink客户端,如CliFrontend
。
Java 类路径 包括 Apache Flink 的 class 及其核心依赖项。
动态用户代码 包括用户 jar 的 classes(和资源)。
因此,为了找到打包在您的 jar 文件中的 "config file",我们应该使用用户代码 class 加载器(您可以在中找到 userCodeClassLoader
的详细信息org.apache.flink.client.program.PackagedProgram
),而不是系统 classloader。
我在 maven/java 项目中使用 flink,需要在创建的 jar 内部包含我的配置。
所以,我在我的 pom 文件中添加了以下内容。这包括我在 jar 中的所有 yml 配置(位于 src/main/resources 文件夹中),我将在执行时将其名称作为参数传递。
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.yml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<finalName>${project.artifactId}-${project.version}</finalName>
<shadedArtifactAttached>true</shadedArtifactAttached>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.exmaple.MyApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
以下主要 class 代码接收一个参数,我根据该参数决定从资源中选择什么配置,读取(使用 snakeyaml)并使用。
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
ClassLoader classLoader = MyApplication.class.getClassLoader();
Yaml yaml = new Yaml();
String filename = parameterTool.getRequired("configFilename");
InputStream in = classLoader.getSystemResourceAsStream(filename);
MyConfigClass = yaml.loadAs(in, MyConfigClass.class);
...
}
mvn 全新安装创建 "my-shaded-jar.jar"
我使用命令
执行java -jar /path/to/my-shaded-jar.jar --configFilename filename
如果我与其他人共享 jar,它可以在多个系统上运行。
但是,当我尝试使用以下命令 运行 Hadoop 上的 yarn 集群中的同一个 jar 时,我遇到了问题:-
HADOOP_CLASSPATH=`hadoop classpath` HADOOP_CONF_DIR=/etc/hadoop/conf ./flink-1.6.2/bin/flink run -m yarn-cluster -yd -yn 5 -ys 30 -yjm 10240 -ytm 10240 -yst -ynm some-job-name -yqu queue-name ./my-shaded-jar.jar --configFilename filename
我收到以下错误:
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:1129)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: org.yaml.snakeyaml.error.YAMLException: java.io.IOException: Stream closed
at org.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:200)
at org.yaml.snakeyaml.reader.StreamReader.<init>(StreamReader.java:60)
at org.yaml.snakeyaml.Yaml.loadAs(Yaml.java:444)
at com.example.MyApplication.main(MyApplication.java:53)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 13 more
Caused by: java.io.IOException: Stream closed
at java.io.PushbackInputStream.ensureOpen(PushbackInputStream.java:74)
at java.io.PushbackInputStream.read(PushbackInputStream.java:166)
at org.yaml.snakeyaml.reader.UnicodeReader.init(UnicodeReader.java:90)
at org.yaml.snakeyaml.reader.UnicodeReader.read(UnicodeReader.java:122)
at java.io.Reader.read(Reader.java:140)
at org.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:184)
为什么我的解决方案适用于任何正常的 linux/mac 系统,但是当在 yarn 集群上执行 运行 flink 运行 命令时,具有相同参数的同一个 jar 会失败。 我们一般执行jar的方式和yarn执行的方式有区别吗
感谢任何帮助。
将 classLoader.getSystemResourceAsStream(filename)
替换为 classLoader.getResourceAsStream(filename)
。
java.lang.ClassLoader#getSystemResourceAsStream
通过系统class加载器定位资源,通常用于启动应用程序。java.lang.ClassLoader#getResourceAsStream
将首先搜索父 class 加载器。如果失败,它将调用当前 class 加载程序的findResource
。
为了避免依赖冲突,class将Flink应用中的es划分为两个域[1],同样适用于Flink客户端,如CliFrontend
。
Java 类路径 包括 Apache Flink 的 class 及其核心依赖项。
动态用户代码 包括用户 jar 的 classes(和资源)。
因此,为了找到打包在您的 jar 文件中的 "config file",我们应该使用用户代码 class 加载器(您可以在中找到 userCodeClassLoader
的详细信息org.apache.flink.client.program.PackagedProgram
),而不是系统 classloader。