Amazon EMR 在为 Apache-Flink 提交作业时遇到 Hadoop 可恢复错误
Amazon EMR while Submitting Job for Apache-Flink getting error with Hadoop recoverable
Added Depedency Pom Details :
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop</artifactId>
<version>1.7.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.529</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>1.1.5</version>
<type>pom</type>
</dependency>
</dependencies>
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
are only supported for HDFS and for Hadoop version 2.7 or newer at
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at
java.lang.Thread.run(Thread.java:748)
为了使用 Flink 的 StreamingFileSink
with exactly once 保证,您需要使用 Hadoop >= 2.7
。不支持 2.7
以下的版本。因此,请确保您 运行 EMR 上的 Hadoop 版本是最新的。
Flink 使用一种叫做 ServiceLoader to load components needed to interface with pluggable File Systems 的东西。如果您想查看 Flink 在代码中的何处执行此操作,请转到 org.apache.flink.core.fs.FileSystem
。注意 initialize
函数,它使用了 RAW_FACTORIES
变量。 RAW_FACTORIES
是由函数 loadFileSystems
创建的,您可以看到它利用了 Java 的 ServiceLoader
.
在您的应用程序在 Flink 上启动之前,需要设置文件系统组件。这意味着您的 Flink 应用程序不需要捆绑这些组件,它们应该为您的应用程序提供。
EMR 不提供 Flink 开箱即用的 S3 文件系统组件。抛出这个异常不是因为版本不够高,而是因为 Flink 在没有匹配 s3
方案 (see code here).
的文件系统的情况下加载了 HadoopFileSystem。
您可以通过为我的 Flink 应用程序启用 DEBUG 日志记录级别来查看您的文件系统是否正在加载,EMR 允许您在配置中执行此操作:
{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}
YARN资源管理器中有相关日志,查看单个节点的日志。搜索字符串 "Added file system"
应该可以帮助您找到所有成功加载的文件系统。
在这个调查中也很方便的是通过 SSH 连接到主节点并使用 flink-scala REPL,在那里我可以看到 Flink 决定加载给定文件 URI 的文件系统。
解决方案是在启动 Flink 应用程序之前将 S3 文件系统实现的 JAR 放到 /usr/lib/flink/lib/
中。这可以通过获取 flink-s3-fs-hadoop
或 flink-s3-fs-presto
的 bootstrap 操作来完成(取决于您使用的实现方式)。我的 bootstrap 动作脚本看起来像这样:
sudo mkdir -p /usr/lib/flink/lib
cd /usr/lib/flink/lib
sudo curl -O https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-s3-fs-hadoop/1.8.1/flink-s3-fs-hadoop-1.8.1.jar
Added Depedency Pom Details :
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop</artifactId>
<version>1.7.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.529</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>1.1.5</version>
<type>pom</type>
</dependency>
</dependencies>
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748)
为了使用 Flink 的 StreamingFileSink
with exactly once 保证,您需要使用 Hadoop >= 2.7
。不支持 2.7
以下的版本。因此,请确保您 运行 EMR 上的 Hadoop 版本是最新的。
Flink 使用一种叫做 ServiceLoader to load components needed to interface with pluggable File Systems 的东西。如果您想查看 Flink 在代码中的何处执行此操作,请转到 org.apache.flink.core.fs.FileSystem
。注意 initialize
函数,它使用了 RAW_FACTORIES
变量。 RAW_FACTORIES
是由函数 loadFileSystems
创建的,您可以看到它利用了 Java 的 ServiceLoader
.
在您的应用程序在 Flink 上启动之前,需要设置文件系统组件。这意味着您的 Flink 应用程序不需要捆绑这些组件,它们应该为您的应用程序提供。
EMR 不提供 Flink 开箱即用的 S3 文件系统组件。抛出这个异常不是因为版本不够高,而是因为 Flink 在没有匹配 s3
方案 (see code here).
您可以通过为我的 Flink 应用程序启用 DEBUG 日志记录级别来查看您的文件系统是否正在加载,EMR 允许您在配置中执行此操作:
{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}
YARN资源管理器中有相关日志,查看单个节点的日志。搜索字符串 "Added file system"
应该可以帮助您找到所有成功加载的文件系统。
在这个调查中也很方便的是通过 SSH 连接到主节点并使用 flink-scala REPL,在那里我可以看到 Flink 决定加载给定文件 URI 的文件系统。
解决方案是在启动 Flink 应用程序之前将 S3 文件系统实现的 JAR 放到 /usr/lib/flink/lib/
中。这可以通过获取 flink-s3-fs-hadoop
或 flink-s3-fs-presto
的 bootstrap 操作来完成(取决于您使用的实现方式)。我的 bootstrap 动作脚本看起来像这样:
sudo mkdir -p /usr/lib/flink/lib
cd /usr/lib/flink/lib
sudo curl -O https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-s3-fs-hadoop/1.8.1/flink-s3-fs-hadoop-1.8.1.jar