如何将流式数据写入S3?

How to write streaming data to S3?

我想使用 Scala 在 Spark Streaming 中将 RDD[String] 写入 Amazon S3。这些基本上是 JSON 个字符串。不确定如何更有效地做到这一点。 我找到了 this post,其中使用了库 spark-s3。这个想法是创建 SparkContext 然后 SQLContext。在此之后 post 的作者做了这样的事情:

myDstream.foreachRDD { rdd =>
      rdd.toDF().write
                .format("com.knoldus.spark.s3")
                .option("accessKey","s3_access_key")
                .option("secretKey","s3_secret_key")
                .option("bucket","bucket_name")
                .option("fileType","json")
                .save("sample.json")
}

除了spark-s3还有哪些选择?是否可以附加 S3 上的文件与流数据?

你应该看看 Spark Documentation 中 dataframewriter 的模式方法:

public DataFrameWriter mode(SaveMode saveMode)

Specifies the behavior when data or table already exists. Options include: - SaveMode.Overwrite: overwrite the existing data. - SaveMode.Append: append the data. - SaveMode.Ignore: ignore the operation (i.e. no-op). - SaveMode.ErrorIfExists: default option, throw an exception at runtime.

您可以尝试使用 Append 保存模式进行类似操作。

rdd.toDF.write
        .format("json")
        .mode(SaveMode.Append)
        .saveAsTextFile("s3://iiiii/ttttt.json");

Spark Append:

Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

基本上,您可以通过将 "format" 关键字传递给方法

来选择您想要的格式作为输出格式

public DataFrameWriter format(java.lang.String source)

Specifies the underlying output data source. Built-in options include "parquet", "json", etc.

例如 parquet:

df.write().format("parquet").save("yourfile.parquet")

json:

df.write().format("json").save("yourfile.json")


编辑:添加了有关 s3 凭据的详细信息:

设置凭据有两种不同的选项,我们可以在 SparkHadoopUtil.scala 中看到 使用环境变量 System.getenv("AWS_ACCESS_KEY_ID") 或使用 spark.hadoop.foo 属性:

SparkHadoopUtil.scala:
if (key.startsWith("spark.hadoop.")) {
          hadoopConf.set(key.substring("spark.hadoop.".length), value)
}

因此,您需要在 javaSparkContext.hadoopConfiguration() or scalaSparkContext.hadoopConfiguration 中获取 hadoopConfiguration 并设置

hadoopConfiguration.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConfiguration.set("fs.s3.awsSecretAccessKey", mySecretKey)

S3 上的文件 cannot be appended。 "append" 表示在 S3 中用包含附加数据的新对象替换现有对象。