是否可以将 Riak CS 与 Apache Flink 一起使用?
Is it possible to use Riak CS with Apache Flink?
我想配置 filesystem
状态后端和 zookeeper
恢复模式:
state.backend: filesystem
state.backend.fs.checkpointdir: ???
recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???
如您所见,我应该指定 checkpointdir
和 storageDir
参数,但我没有 Apache Flink 支持的任何文件系统(如 HDFS 或 Amazon S3)。但是我安装了Riak CS集群(好像是compatible with S3)。
那么,我可以将 Riak CS 与 Apache Flink 一起使用吗?如果可能:如何配置 Apache Flink 以与 Riak CS 一起工作?
答:如何加入Apache Flink和Riak CS?
Riak CS 具有 S3(版本 2)兼容接口。因此,可以使用 Hadoop 的 S3 文件系统适配器与 Riak CS 一起工作。
我不知道为什么,但 Apache Flink 在 fat jar (lib/flink-dist_2.11-1.0.1.jar
) 中只有部分 Hadoop 文件系统适配器,即它有 FTP 文件系统 (org.apache.hadoop.fs.ftp.FTPFileSystem
) 但没有没有 S3 文件系统(即 org.apache.hadoop.fs.s3a.S3AFileSystem
)。所以,你有两种方法来解决这个问题:
- 使用 Hadoop 安装中的这些适配器。我没有尝试这个,但看起来你应该只配置 HADOOP_CLASSPATH 或 HADOOP_HOME evn 变量。
- monky 修补 Apache Flink 并将所需的 JAR 下载到
<flink home>/lib
目录
因此,我选择了第二种方式,因为不想在我的环境中配置 Hadoop。您可以从 Hadoop dist 或互联网复制 JAR:
curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar
如您所见,我使用的是旧版本,因为在 Hadoop 2.7.2 中使用的是旧版本,而我使用的 Flink 与此版本的 Hadoop 兼容。
仅供参考:如果您在自己的流程中使用这些 JAR 的最新版本,此类 hack 可能会导致问题。为避免与不同版本相关的问题,您可以在使用 flow 构建 fat jar 时重新定位包,使用类似的东西(我正在使用 Gradle):
// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
dependencies {
include(dependency('.*:.*:.*'))
}
relocate 'org.apache.http', 'relocated.org.apache.http'
relocate 'org.apache.commons', 'relocated.org.apache.commons'
}
然后您应该在 flink-conf.yaml
中指定 core-site.xml
的路径,因为 Hadoop 兼容文件系统使用此配置加载设置:
...
fs.hdfs.hadoopconf: /flink/conf
...
如您所见,我只是将它放在 <fink home>/conf
目录中。它具有以下设置:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>my-riak-cs.stage.local</value> // this is my Riak CS host
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
<value>false</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>????</value> // this is my access key for Riak CS
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>????</value> // this is my secret key for Riak CS
</property>
</configuration>
那么你应该在 flink-conf.yaml
中配置 Riak CS 桶作为推荐 here:
...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...
并在 Riak CS 中创建存储桶。我正在使用 s3cmd
(在我的 OS X 开发环境中安装在 brew
上):
s3cmd mb s3://example-staging-flink
仅供参考:在使用 s3cmd
之前,您应该使用 s3cmd --configure
配置它,然后修复 ~/.s3cmd
文件中的一些设置:
signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS
所以,这就是您应该为 Riak CS 中的独立 HA Apache Flink 集群的 save/restore 状态配置的所有内容。
我想配置 filesystem
状态后端和 zookeeper
恢复模式:
state.backend: filesystem
state.backend.fs.checkpointdir: ???
recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???
如您所见,我应该指定 checkpointdir
和 storageDir
参数,但我没有 Apache Flink 支持的任何文件系统(如 HDFS 或 Amazon S3)。但是我安装了Riak CS集群(好像是compatible with S3)。
那么,我可以将 Riak CS 与 Apache Flink 一起使用吗?如果可能:如何配置 Apache Flink 以与 Riak CS 一起工作?
答:如何加入Apache Flink和Riak CS?
Riak CS 具有 S3(版本 2)兼容接口。因此,可以使用 Hadoop 的 S3 文件系统适配器与 Riak CS 一起工作。
我不知道为什么,但 Apache Flink 在 fat jar (lib/flink-dist_2.11-1.0.1.jar
) 中只有部分 Hadoop 文件系统适配器,即它有 FTP 文件系统 (org.apache.hadoop.fs.ftp.FTPFileSystem
) 但没有没有 S3 文件系统(即 org.apache.hadoop.fs.s3a.S3AFileSystem
)。所以,你有两种方法来解决这个问题:
- 使用 Hadoop 安装中的这些适配器。我没有尝试这个,但看起来你应该只配置 HADOOP_CLASSPATH 或 HADOOP_HOME evn 变量。
- monky 修补 Apache Flink 并将所需的 JAR 下载到
<flink home>/lib
目录
因此,我选择了第二种方式,因为不想在我的环境中配置 Hadoop。您可以从 Hadoop dist 或互联网复制 JAR:
curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar
如您所见,我使用的是旧版本,因为在 Hadoop 2.7.2 中使用的是旧版本,而我使用的 Flink 与此版本的 Hadoop 兼容。
仅供参考:如果您在自己的流程中使用这些 JAR 的最新版本,此类 hack 可能会导致问题。为避免与不同版本相关的问题,您可以在使用 flow 构建 fat jar 时重新定位包,使用类似的东西(我正在使用 Gradle):
// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
dependencies {
include(dependency('.*:.*:.*'))
}
relocate 'org.apache.http', 'relocated.org.apache.http'
relocate 'org.apache.commons', 'relocated.org.apache.commons'
}
然后您应该在 flink-conf.yaml
中指定 core-site.xml
的路径,因为 Hadoop 兼容文件系统使用此配置加载设置:
...
fs.hdfs.hadoopconf: /flink/conf
...
如您所见,我只是将它放在 <fink home>/conf
目录中。它具有以下设置:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>my-riak-cs.stage.local</value> // this is my Riak CS host
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
<value>false</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>????</value> // this is my access key for Riak CS
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>????</value> // this is my secret key for Riak CS
</property>
</configuration>
那么你应该在 flink-conf.yaml
中配置 Riak CS 桶作为推荐 here:
...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...
并在 Riak CS 中创建存储桶。我正在使用 s3cmd
(在我的 OS X 开发环境中安装在 brew
上):
s3cmd mb s3://example-staging-flink
仅供参考:在使用 s3cmd
之前,您应该使用 s3cmd --configure
配置它,然后修复 ~/.s3cmd
文件中的一些设置:
signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS
所以,这就是您应该为 Riak CS 中的独立 HA Apache Flink 集群的 save/restore 状态配置的所有内容。