Apache Flink 将 S3 用于后端状态和检查点
Apache Flink to use S3 for backend state and checkpoints
背景
- 我打算使用 S3 来存储使用
FsStateBackend
的 Flink 的检查点。但不知何故我收到以下错误。
错误
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
Flink版本:我使用的是Flink 1.10.0版本
我已经找到了上述问题的解决方案,所以在这里我列出了所需的步骤。
步骤
- 我们需要在下面列出的
flink-conf.yaml
文件中添加一些配置。
state.backend: filesystem
state.checkpoints.dir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
state.backend.fs.checkpointdir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
s3.access-key: XXXXXXXXXXXXXXXXXXX #your-access-key
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #your-secret-key
s3.endpoint: http://127.0.0.1:9000 #your-endpoint-hostname (I have used Minio)
完成第一步后,我们需要将相应的(flink-s3-fs-hadoop-1.10.0.jar
和flink-s3-fs-presto-1.10.0.jar
)JAR文件从opt目录复制到Flink的plugins目录。
- 例如:--> 1.复制
/flink-1.10.0/opt/flink-s3-fs-hadoop-1.10.0.jar
到/flink-1.10.0/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.10.0.jar
//推荐用于StreamingFileSink
2. Copy /flink-1.10.0/opt/flink-s3-fs-presto-1.10.0.jar
to /flink-1.10.0/plugins/s3-fs-presto/flink-s3-fs-presto-1.10.0.jar
//推荐用于checkpointing
在检查点代码中添加这个
env.setStateBackend(new FsStateBackend("s3://s3-bucket/checkpoints/"))
- 完成以上所有步骤后re-start Flink if if it is already 运行.
注:
- 如果您在 Flink 中同时使用(
flink-s3-fs-hadoop
和 flink-s3-fs-presto
),那么请使用 s3p://
专门用于 flink-s3-fs-presto
和 s3a://
用于 flink-s3-fs-hadoop
而不是 s3://
.
- 有关详细信息,请单击 here。
背景
- 我打算使用 S3 来存储使用
FsStateBackend
的 Flink 的检查点。但不知何故我收到以下错误。
错误
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
Flink版本:我使用的是Flink 1.10.0版本
我已经找到了上述问题的解决方案,所以在这里我列出了所需的步骤。
步骤
- 我们需要在下面列出的
flink-conf.yaml
文件中添加一些配置。
state.backend: filesystem
state.checkpoints.dir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
state.backend.fs.checkpointdir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
s3.access-key: XXXXXXXXXXXXXXXXXXX #your-access-key
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #your-secret-key
s3.endpoint: http://127.0.0.1:9000 #your-endpoint-hostname (I have used Minio)
完成第一步后,我们需要将相应的(
flink-s3-fs-hadoop-1.10.0.jar
和flink-s3-fs-presto-1.10.0.jar
)JAR文件从opt目录复制到Flink的plugins目录。- 例如:--> 1.复制
/flink-1.10.0/opt/flink-s3-fs-hadoop-1.10.0.jar
到/flink-1.10.0/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.10.0.jar
//推荐用于StreamingFileSink
2. Copy/flink-1.10.0/opt/flink-s3-fs-presto-1.10.0.jar
to/flink-1.10.0/plugins/s3-fs-presto/flink-s3-fs-presto-1.10.0.jar
//推荐用于checkpointing
- 例如:--> 1.复制
在检查点代码中添加这个
env.setStateBackend(new FsStateBackend("s3://s3-bucket/checkpoints/"))
- 完成以上所有步骤后re-start Flink if if it is already 运行.
注:
- 如果您在 Flink 中同时使用(
flink-s3-fs-hadoop
和flink-s3-fs-presto
),那么请使用s3p://
专门用于flink-s3-fs-presto
和s3a://
用于flink-s3-fs-hadoop
而不是s3://
. - 有关详细信息,请单击 here。