如何将流式数据写入S3?
How to write streaming data to S3?
我想使用 Scala 在 Spark Streaming 中将 RDD[String]
写入 Amazon S3。这些基本上是 JSON 个字符串。不确定如何更有效地做到这一点。
我找到了 this post,其中使用了库 spark-s3
。这个想法是创建 SparkContext
然后 SQLContext
。在此之后 post 的作者做了这样的事情:
myDstream.foreachRDD { rdd =>
rdd.toDF().write
.format("com.knoldus.spark.s3")
.option("accessKey","s3_access_key")
.option("secretKey","s3_secret_key")
.option("bucket","bucket_name")
.option("fileType","json")
.save("sample.json")
}
除了spark-s3
还有哪些选择?是否可以附加 S3 上的文件与流数据?
你应该看看 Spark Documentation 中 dataframewriter 的模式方法:
public DataFrameWriter mode(SaveMode saveMode)
Specifies the behavior when data or table already exists. Options
include: - SaveMode.Overwrite: overwrite the existing data. -
SaveMode.Append: append the data. - SaveMode.Ignore: ignore the
operation (i.e. no-op). - SaveMode.ErrorIfExists: default option,
throw an exception at runtime.
您可以尝试使用 Append 保存模式进行类似操作。
rdd.toDF.write
.format("json")
.mode(SaveMode.Append)
.saveAsTextFile("s3://iiiii/ttttt.json");
Append mode means that when saving a DataFrame to a data source, if
data/table already exists, contents of the DataFrame are expected to
be appended to existing data.
基本上,您可以通过将 "format" 关键字传递给方法
来选择您想要的格式作为输出格式
public DataFrameWriter format(java.lang.String source)
Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
例如 parquet
:
df.write().format("parquet").save("yourfile.parquet")
或json
:
df.write().format("json").save("yourfile.json")
编辑:添加了有关 s3 凭据的详细信息:
设置凭据有两种不同的选项,我们可以在 SparkHadoopUtil.scala
中看到
使用环境变量 System.getenv("AWS_ACCESS_KEY_ID")
或使用 spark.hadoop.foo
属性:
SparkHadoopUtil.scala:
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
因此,您需要在 javaSparkContext.hadoopConfiguration() or scalaSparkContext.hadoopConfiguration 中获取 hadoopConfiguration
并设置
hadoopConfiguration.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConfiguration.set("fs.s3.awsSecretAccessKey", mySecretKey)
S3 上的文件 cannot be appended。 "append" 表示在 S3 中用包含附加数据的新对象替换现有对象。
我想使用 Scala 在 Spark Streaming 中将 RDD[String]
写入 Amazon S3。这些基本上是 JSON 个字符串。不确定如何更有效地做到这一点。
我找到了 this post,其中使用了库 spark-s3
。这个想法是创建 SparkContext
然后 SQLContext
。在此之后 post 的作者做了这样的事情:
myDstream.foreachRDD { rdd =>
rdd.toDF().write
.format("com.knoldus.spark.s3")
.option("accessKey","s3_access_key")
.option("secretKey","s3_secret_key")
.option("bucket","bucket_name")
.option("fileType","json")
.save("sample.json")
}
除了spark-s3
还有哪些选择?是否可以附加 S3 上的文件与流数据?
你应该看看 Spark Documentation 中 dataframewriter 的模式方法:
public DataFrameWriter mode(SaveMode saveMode)
Specifies the behavior when data or table already exists. Options include: - SaveMode.Overwrite: overwrite the existing data. - SaveMode.Append: append the data. - SaveMode.Ignore: ignore the operation (i.e. no-op). - SaveMode.ErrorIfExists: default option, throw an exception at runtime.
您可以尝试使用 Append 保存模式进行类似操作。
rdd.toDF.write
.format("json")
.mode(SaveMode.Append)
.saveAsTextFile("s3://iiiii/ttttt.json");
Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
基本上,您可以通过将 "format" 关键字传递给方法
来选择您想要的格式作为输出格式
public DataFrameWriter format(java.lang.String source)
Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
例如 parquet
:
df.write().format("parquet").save("yourfile.parquet")
或json
:
df.write().format("json").save("yourfile.json")
编辑:添加了有关 s3 凭据的详细信息:
设置凭据有两种不同的选项,我们可以在 SparkHadoopUtil.scala
中看到
使用环境变量 System.getenv("AWS_ACCESS_KEY_ID")
或使用 spark.hadoop.foo
属性:
SparkHadoopUtil.scala:
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
因此,您需要在 javaSparkContext.hadoopConfiguration() or scalaSparkContext.hadoopConfiguration 中获取 hadoopConfiguration
并设置
hadoopConfiguration.set("fs.s3.awsAccessKeyId", myAccessKey)
hadoopConfiguration.set("fs.s3.awsSecretAccessKey", mySecretKey)
S3 上的文件 cannot be appended。 "append" 表示在 S3 中用包含附加数据的新对象替换现有对象。