如何配置 Flink 将 S3 用于后端状态和检查点

How to configure Flink to use S3 for backend state and checkpoints

我有一个 Flink v1.2、3 个 JobManager、2 个 TaskManager 的设置。我想使用 S3 存储桶而不是 hdfs 作为后端状态和检查点以及 zookeeper storageDir

fs.s3.accessKey: [accessKey]
fs.s3.secretKey: [secretKey]

state.backend: filesystem

state.backend.fs.checkpointdir: s3:///[bucket]/flink-checkpoints
state.checkpoints.dir: s3:///[bucket]/external-checkpoints
high-availability: zookeeper
high-availability.zookeeper.storageDir: s3:///[bucket]/recovery

在我记录的 JobManager 中我有

2017-03-22 09:52:40,971 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,981 WARN  org.apache.hadoop.security.UserGroupInformation               - PriviledgedActionException as:ubuntu (auth:SIMPLE) cause:java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
2017-03-22 09:52:40,981 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,983 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2017-03-22 09:52:40,993 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.

我没有安装hadoop。不确定这是否需要以及是否应该如何/在哪里安装/配置?

编辑:在使用以下 hadoop xml (core-site.xml) 配置 Flink 后,我不太了解 IAM 部分并且我没有使用 EMR,我自己安装了集群(在AWS) 能够在不依赖镜像的情况下更新 Flink:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>s3://[ bucket ] </value>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <description>[ Access Key ]</description>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <description>[ Secret Key ]</description>
    </property>

    <property>
        <name>fs.s3.awsAccessKeyId</name>
        <description>[ Access Key ]</description>
    </property>

    <property>
        <name>fs.s3.awsSecretAccessKey</name>
        <description>[ Secret Key ]</description>
    </property>

    <property>
        <name>fs.s3n.awsAccessKeyId</name>
        <value>[ Access Key ]</value>
    </property>

    <property>
        <name>fs.s3n.awsSecretAccessKey</name>
        <value>[ Secret Key ]</value>
    </property>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3.buffer.dir</name>
        <value>/tmp</value>
    </property>
</configuration>

我收到此错误:

  2017-03-24 11:20:17,760 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Error while starting up JobManager
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
        at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
        at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
        at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:303)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
        at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
        at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
        at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
        at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
        at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
        at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3(JobManager.scala:2020)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply$mcV$sp(JobManager.scala:2098)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
        at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1971)
        at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1969)
        at org.apache.flink.runtime.security.HadoopSecurityContext.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
        at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

编辑:我的错误是我在描述字段中设置了键而不是值。

请查看 guide on running Flink with S3 如何设置 S3。

我认为您缺少的是带有 fs.s3.impl 配置键的 hadoop 配置文件。 即使你没有使用 Hadoop,你仍然需要使用 Hadoop 配置文件。

Robert,我建议人们使用 "s3a" 文件系统,如果安装了 hadoop-aws 和匹配的 amazon JAR,它将自动注册。 S3:// URL 在 ASF 代码库中已弃用,即使 EMR 仍在使用它们。

我在使用最新的 Flink 版本 (1.10.0) 和 s3 时遇到了同样的问题。在最新的 Flink 中,你需要做一些不同的处理。

所以请找到我提供的详细答案 .