如何配置 Flink 将 S3 用于后端状态和检查点
How to configure Flink to use S3 for backend state and checkpoints
我有一个 Flink v1.2、3 个 JobManager、2 个 TaskManager 的设置。我想使用 S3 存储桶而不是 hdfs 作为后端状态和检查点以及 zookeeper storageDir
fs.s3.accessKey: [accessKey]
fs.s3.secretKey: [secretKey]
state.backend: filesystem
state.backend.fs.checkpointdir: s3:///[bucket]/flink-checkpoints
state.checkpoints.dir: s3:///[bucket]/external-checkpoints
high-availability: zookeeper
high-availability.zookeeper.storageDir:
s3:///[bucket]/recovery
在我记录的 JobManager 中我有
2017-03-22 09:52:40,971 ERROR org.apache.flink.runtime.jobmanager.JobManager - Error while starting up JobManager
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,981 WARN org.apache.hadoop.security.UserGroupInformation - PriviledgedActionException as:ubuntu (auth:SIMPLE) cause:java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
2017-03-22 09:52:40,981 ERROR org.apache.flink.runtime.jobmanager.JobManager - Failed to run JobManager.
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,983 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.
2017-03-22 09:52:40,993 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.
我没有安装hadoop。不确定这是否需要以及是否应该如何/在哪里安装/配置?
编辑:在使用以下 hadoop xml (core-site.xml) 配置 Flink 后,我不太了解 IAM 部分并且我没有使用 EMR,我自己安装了集群(在AWS) 能够在不依赖镜像的情况下更新 Flink:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>s3://[ bucket ] </value>
</property>
<property>
<name>fs.s3a.access.key</name>
<description>[ Access Key ]</description>
</property>
<property>
<name>fs.s3a.secret.key</name>
<description>[ Secret Key ]</description>
</property>
<property>
<name>fs.s3.awsAccessKeyId</name>
<description>[ Access Key ]</description>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<description>[ Secret Key ]</description>
</property>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>[ Access Key ]</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>[ Secret Key ]</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
我收到此错误:
2017-03-24 11:20:17,760 ERROR org.apache.flink.runtime.jobmanager.JobManager - Error while starting up JobManager
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:303)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
编辑:我的错误是我在描述字段中设置了键而不是值。
请查看 guide on running Flink with S3 如何设置 S3。
我认为您缺少的是带有 fs.s3.impl 配置键的 hadoop 配置文件。
即使你没有使用 Hadoop,你仍然需要使用 Hadoop 配置文件。
Robert,我建议人们使用 "s3a" 文件系统,如果安装了 hadoop-aws 和匹配的 amazon JAR,它将自动注册。 S3:// URL 在 ASF 代码库中已弃用,即使 EMR 仍在使用它们。
我在使用最新的 Flink 版本 (1.10.0) 和 s3 时遇到了同样的问题。在最新的 Flink 中,你需要做一些不同的处理。
所以请找到我提供的详细答案 .
我有一个 Flink v1.2、3 个 JobManager、2 个 TaskManager 的设置。我想使用 S3 存储桶而不是 hdfs 作为后端状态和检查点以及 zookeeper storageDir
fs.s3.accessKey: [accessKey]
fs.s3.secretKey: [secretKey]state.backend: filesystem
state.backend.fs.checkpointdir: s3:///[bucket]/flink-checkpoints
state.checkpoints.dir: s3:///[bucket]/external-checkpoints
high-availability: zookeeper
high-availability.zookeeper.storageDir: s3:///[bucket]/recovery
在我记录的 JobManager 中我有
2017-03-22 09:52:40,971 ERROR org.apache.flink.runtime.jobmanager.JobManager - Error while starting up JobManager
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,981 WARN org.apache.hadoop.security.UserGroupInformation - PriviledgedActionException as:ubuntu (auth:SIMPLE) cause:java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
2017-03-22 09:52:40,981 ERROR org.apache.flink.runtime.jobmanager.JobManager - Failed to run JobManager.
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,983 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.
2017-03-22 09:52:40,993 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.
我没有安装hadoop。不确定这是否需要以及是否应该如何/在哪里安装/配置?
编辑:在使用以下 hadoop xml (core-site.xml) 配置 Flink 后,我不太了解 IAM 部分并且我没有使用 EMR,我自己安装了集群(在AWS) 能够在不依赖镜像的情况下更新 Flink:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>s3://[ bucket ] </value>
</property>
<property>
<name>fs.s3a.access.key</name>
<description>[ Access Key ]</description>
</property>
<property>
<name>fs.s3a.secret.key</name>
<description>[ Secret Key ]</description>
</property>
<property>
<name>fs.s3.awsAccessKeyId</name>
<description>[ Access Key ]</description>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<description>[ Secret Key ]</description>
</property>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>[ Access Key ]</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>[ Secret Key ]</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
我收到此错误:
2017-03-24 11:20:17,760 ERROR org.apache.flink.runtime.jobmanager.JobManager - Error while starting up JobManager
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:303)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
编辑:我的错误是我在描述字段中设置了键而不是值。
请查看 guide on running Flink with S3 如何设置 S3。
我认为您缺少的是带有 fs.s3.impl 配置键的 hadoop 配置文件。 即使你没有使用 Hadoop,你仍然需要使用 Hadoop 配置文件。
Robert,我建议人们使用 "s3a" 文件系统,如果安装了 hadoop-aws 和匹配的 amazon JAR,它将自动注册。 S3:// URL 在 ASF 代码库中已弃用,即使 EMR 仍在使用它们。
我在使用最新的 Flink 版本 (1.10.0) 和 s3 时遇到了同样的问题。在最新的 Flink 中,你需要做一些不同的处理。
所以请找到我提供的详细答案