Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?

Does Apache Flink AWS S3 Sink require Hadoop for local testing?

我对 Apache Flink 比较陌生,我正在尝试创建一个简单的项目来生成一个文件到 AWS S3 存储桶。根据文档,我似乎需要安装 Hadoop 才能执行此操作。

如何设置我的本地环境来测试此功能?我已经在本地安装了 Apache Flink 和 Hadoop。我已将必要的更改添加到 Hadoop 的 core-site.xml 配置,并将我的 HADOOP_CONF 路径添加到我的 flink.yaml 配置。当我尝试通过 Flink 在本地提交作业时 UI 我总是收到错误

2016-12-29 16:03:49,861 INFO  org.apache.flink.util.NetUtils                                - Unable to allocate on port 6123, due to error: Address already in use
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.lang.RuntimeException: Unable to do further retries starting the actor system
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203)
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143)
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040)
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

我假设我在环境设置方面遗漏了一些东西。是否可以在本地执行此操作?任何帮助,将不胜感激。

虽然您需要 Hadoop 库,但您不必将 Hadoop 安装到 运行 本地并写入 S3。我只是碰巧通过编写基于 Avro 模式的 Parquet 输出来尝试这一点,并生成了 SpecificRecord 到 S3。我正在 运行 通过 SBT 和 Intellij Idea 在本地创建以下代码的一个版本。所需部件:

1) 使用以下文件指定所需的 Hadoop 属性(注意:不建议定义 AWS 访问 key/secret 密钥。最好在具有适当 IAM 角色的 EC2 实例上 运行 read/write 到您的 S3 存储桶。但需要在本地进行测试)

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2) 进口: 导入 com.uebercomputing.eventrecord.EventOnlyRecord

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job

import org.apache.parquet.avro.AvroParquetOutputFormat

3) Flink 代码使用具有上述配置的 HadoopOutputFormat:

    val events: DataSet[(Void, EventOnlyRecord)] = ...

    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)

    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
    val outputJob = Job.getInstance

    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
    //so key is Void, value of type T - EventOnlyRecord in this case
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
      outputFormat,
      outputJob
    )

    val outputConfig = outputJob.getConfiguration
    outputConfig.addResource(hadoopConfig)
    val outputPath = new Path("s3://<bucket>/<dir-prefix>")
    FileOutputFormat.setOutputPath(outputJob, outputPath)
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)

    events.output(hadoopOutputFormat)

    env.execute

    ...

    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
      val hadoopConfig = new HadoopConfiguration()
      hadoopConfig.addResource(new Path(hadoodConfigPath))
      hadoopConfig
    }

4) 构建依赖项和使用的版本:

    val awsSdkVersion = "1.7.4"
    val hadoopVersion = "2.7.3"
    val flinkVersion = "1.1.4"

    val flinkDependencies = Seq(
      ("org.apache.flink" %% "flink-scala" % flinkVersion),
      ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
    )

    val providedFlinkDependencies = flinkDependencies.map(_ % "provided")

    val serializationDependencies = Seq(
      ("org.apache.avro" % "avro" % "1.7.7"),
      ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
      ("org.apache.parquet" % "parquet-avro" % "1.8.1")
    )

    val s3Dependencies = Seq(
      ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
      ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
    )

编辑使用 writeAsText 到 S3:

1) 创建一个 Hadoop 配置目录(将其称为 hadoop-conf-dir),其中包含一个文件 core-site.xml。

例如:

mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml

#content of core-site.xml 
<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2) 创建一个目录(将其称为 flink-conf-dir),其中包含一个文件 flink-conf.yaml。

例如:

mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml

//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config

3) 编辑用于 运行 S3 Flink 作业的 IntelliJ 运行 配置 - 运行 - 编辑配置 - 并添加以下环境变量:

FLINK_CONF_DIR and set it to your flink-conf-dir

Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config

4) 运行 设置了该环境变量的代码:

events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute

我必须对 运行 我在本地下沉到 S3 的 flink 作业执行以下操作:

1- 添加了 flink-s3-fs-hadoop-1.9.1.jar 到我的 flink/plugins/flink-s3-fs-hadoop 目录

2- 修改了 flink/conf/flink-conf.yaml 以包含 s3.access-键:AWS_ACCESS_KEY s3.secret-键:AWS_SECRET_KEY fs.hdfs.hadoopconf: /etc/hadoop-config

我在 hadoop-config 文件夹中有 core-site.xml 文件,但它不包含任何配置,因此可能不需要 fs.hdfs.hadoopconf。

在 sbt 中我只需要添加 S3 库依赖项就可以像本地文件系统一样使用它

SBT 文件:

"org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion.value

阅读范例:

    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> text = env.readTextFile("s3://etl-data-ia/test/fileStreamTest.csv");
    text.print();
    env.execute("test");}

基于 link https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins

To use flink-s3-fs-hadoop plugin you should copy the respective JAR file from the opt directory to the plugins directory of your Flink distribution before starting Flink.

我知道的另一种方法是通过环境变量启用它 ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-[flink-version].jar"

例如:flink-s3-fs-hadoop-1.12.2.jar

对于这两种方式,我们都必须在 flink-conf.yaml 文件中定义 S3 配置

Flink will internally translate this back to fs.s3a.connection.maximum. There is no need to pass configuration parameters using Hadoop’s XML configuration files.

s3.endpoint: <end-point>
s3.path.style.access : true

至于 AWS 凭据,它们必须在环境变量或中提供。配置在 flink-conf.yaml

s3.endpoint: <end-point>
s3.path.style.access : true
s3.access-key: <key>
s3.secret-key: <value>
s3.region: <region>

设置完成后,您可以像@EyalP 提到的那样从 S3 读取,或写入 S3(即使用数据集)

dataset.map(new MapToJsonString())
                .writeAsText("s3://....",
                        FileSystem.WriteMode.OVERWRITE);

如果您想在本地测试它(没有真正的 AWS 帐户)我建议您检查 localstack。它完全支持各种 AWS 服务(包括 S3)。如果你这样做,那么 AWS 凭证就不是必需的(可能会提供空的)并且端点将是本地堆栈本身。