如何将 RDD 数据保存到 json 文件中,而不是文件夹中
How to save RDD data into json files, not folders
我正在接收要保存在 S3 中的流数据 myDStream
(DStream[String]
)(基本上,对于这个问题,我想保存到哪里并不重要输出,但我提到它以防万一)。
以下代码运行良好,但它保存的文件夹名称类似于 jsonFile-19-45-46.json
,然后在文件夹内保存文件 _SUCCESS
和 part-00000
.
是否可以将每个 RDD[String]
(这些是 JSON 字符串)数据保存到 JSON 文件 而不是文件夹中?本以为repartition(1)
一定要使出这一招,没想到
myDStream.foreachRDD { rdd =>
// datetimeString = ....
rdd.repartition(1).saveAsTextFile("s3n://mybucket/keys/jsonFile-"+datetimeString+".json")
}
据我所知,没有将其另存为文件的选项。因为它是一个分布式处理框架,所以在单个文件上写入不是一个好习惯,而不是每个分区在指定路径中写入它自己的文件。
We can pass only output directory where we wanted to save the data. OutputWriter will create file(s)(depends on partitions) inside specified path with part-
file name prefix.
作为 rdd.collect.mkString("\n")
的替代方法,您可以使用 hadoop 文件系统库通过将 part-00000
文件移动到它的位置来清理输出。下面的代码在本地文件系统和 HDFS 上完美运行,但我无法使用 S3 对其进行测试:
val outputPath = "path/to/some/file.json"
rdd.saveAsTextFile(outputPath + "-tmp")
import org.apache.hadoop.fs.Path
val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.rename(new Path(outputPath + "-tmp/part-00000"), new Path(outputPath))
fs.delete(new Path(outputPath + "-tmp"), true)
为了 JAVA 我实现了这个。希望对您有所帮助:
val fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
File dir = new File(System.getProperty("user.dir") + "/my.csv/");
File[] files = dir.listFiles((d, name) -> name.endsWith(".csv"));
fs.rename(new Path(files[0].toURI()), new Path(System.getProperty("user.dir") + "/csvDirectory/newData.csv"));
fs.delete(new Path(System.getProperty("user.dir") + "/my.csv/"), true);
我正在接收要保存在 S3 中的流数据 myDStream
(DStream[String]
)(基本上,对于这个问题,我想保存到哪里并不重要输出,但我提到它以防万一)。
以下代码运行良好,但它保存的文件夹名称类似于 jsonFile-19-45-46.json
,然后在文件夹内保存文件 _SUCCESS
和 part-00000
.
是否可以将每个 RDD[String]
(这些是 JSON 字符串)数据保存到 JSON 文件 而不是文件夹中?本以为repartition(1)
一定要使出这一招,没想到
myDStream.foreachRDD { rdd =>
// datetimeString = ....
rdd.repartition(1).saveAsTextFile("s3n://mybucket/keys/jsonFile-"+datetimeString+".json")
}
据我所知,没有将其另存为文件的选项。因为它是一个分布式处理框架,所以在单个文件上写入不是一个好习惯,而不是每个分区在指定路径中写入它自己的文件。
We can pass only output directory where we wanted to save the data. OutputWriter will create file(s)(depends on partitions) inside specified path with
part-
file name prefix.
作为 rdd.collect.mkString("\n")
的替代方法,您可以使用 hadoop 文件系统库通过将 part-00000
文件移动到它的位置来清理输出。下面的代码在本地文件系统和 HDFS 上完美运行,但我无法使用 S3 对其进行测试:
val outputPath = "path/to/some/file.json"
rdd.saveAsTextFile(outputPath + "-tmp")
import org.apache.hadoop.fs.Path
val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.rename(new Path(outputPath + "-tmp/part-00000"), new Path(outputPath))
fs.delete(new Path(outputPath + "-tmp"), true)
为了 JAVA 我实现了这个。希望对您有所帮助:
val fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
File dir = new File(System.getProperty("user.dir") + "/my.csv/");
File[] files = dir.listFiles((d, name) -> name.endsWith(".csv"));
fs.rename(new Path(files[0].toURI()), new Path(System.getProperty("user.dir") + "/csvDirectory/newData.csv"));
fs.delete(new Path(System.getProperty("user.dir") + "/my.csv/"), true);