在 AWS EMR 中部署 Flink 自定义 JAR 文件时出错

Error while deploying Flink custom JAR file in AWS EMR

基本上我想部署一个 Flink 自定义 JAR 文件到一个新的 AWS EMR 集群。这是我所做的总结。我创建了一个新的 AWS EMR 集群。

集群创建准备就绪后,我通过 SSH 连接到 EC2 机器并尝试部署自定义 jar 文件。以下是我每次尝试通过 CLI 部署它时遇到的不同错误。

1)

flink run -m yarn-cluster -yn 2 -c com.deepak.flink.examples.WordCount flink-examples-assembly-1.0.jar

Using the result of 'hadoop classpath' to augment the Hadoop classpath: /etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*::/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2018-10-09 06:30:36,766 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at ip-IPADDRESS.ec2.internal/IPADDRESS:8032
2018-10-09 06:30:36,909 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2018-10-09 06:30:37,168 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Killing YARN application

2)

flink run -c com.deepak.flink.examples.WordCount flink-examples-assembly-1.0.jar

Using the result of 'hadoop classpath' to augment the Hadoop classpath: /etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*::/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/cloudwatch-sink/lib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve standalone cluster
    at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:51)
    at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:31)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:253)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
    at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:1101)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.util.ConfigurationException: Config parameter 'Key: 'jobmanager.rpc.address' , default: null (deprecated keys: [])' is missing (hostname/address of JobManager to connect to).
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getJobManagerAddress(HighAvailabilityServicesUtils.java:141)
    at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:81)
    at org.apache.flink.client.program.ClusterClient.<init>(ClusterClient.java:158)
    at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:183)
    at org.apache.flink.client.program.rest.RestClusterClient.<init>(RestClusterClient.java:156)
    at org.apache.flink.client.deployment.StandaloneClusterDescriptor.retrieve(StandaloneClusterDescriptor.java:49)
    ... 10 more

即使我尝试通过 AWS Web 部署 UI,jar 也无法部署。

所以,基本上我想将自定义 JAR 部署到 flink YARN 集群。我不确定 YARN flink 配置或其他任何东西缺少什么。提前感谢您的帮助。

您应该减少任务管理器的内存分配。目前,您正在尝试分配 51.2G 内存,而单个 m3.xlarge 机器只有 15G 内存,2 台机器集群总共 30G。