如何将 RDD 数据保存到 json 文件中,而不是文件夹中

How to save RDD data into json files, not folders

我正在接收要保存在 S3 中的流数据 myDStream (DStream[String])(基本上,对于这个问题,我想保存到哪里并不重要输出,但我提到它以防万一)。

以下代码运行良好,但它保存的文件夹名称类似于 jsonFile-19-45-46.json,然后在文件夹内保存文件 _SUCCESSpart-00000.

是否可以将每个 RDD[String](这些是 JSON 字符串)数据保存到 JSON 文件 而不是文件夹中?本以为repartition(1)一定要使出这一招,没想到

    myDStream.foreachRDD { rdd => 
       // datetimeString = ....
       rdd.repartition(1).saveAsTextFile("s3n://mybucket/keys/jsonFile-"+datetimeString+".json")
    }

据我所知,没有将其另存为文件的选项。因为它是一个分布式处理框架,所以在单个文件上写入不是一个好习惯,而不是每个分区在指定路径中写入它自己的文件。

We can pass only output directory where we wanted to save the data. OutputWriter will create file(s)(depends on partitions) inside specified path with part- file name prefix.

作为 rdd.collect.mkString("\n") 的替代方法,您可以使用 hadoop 文件系统库通过将 part-00000 文件移动到它的位置来清理输出。下面的代码在本地文件系统和 HDFS 上完美运行,但我无法使用 S3 对其进行测试:

val outputPath = "path/to/some/file.json"
rdd.saveAsTextFile(outputPath + "-tmp")

import org.apache.hadoop.fs.Path
val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.rename(new Path(outputPath + "-tmp/part-00000"), new Path(outputPath))
fs.delete(new Path(outputPath  + "-tmp"), true)

为了 JAVA 我实现了这个。希望对您有所帮助:

    val fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
    File dir = new File(System.getProperty("user.dir") + "/my.csv/");
    File[] files = dir.listFiles((d, name) -> name.endsWith(".csv"));
    fs.rename(new Path(files[0].toURI()), new Path(System.getProperty("user.dir") + "/csvDirectory/newData.csv"));
    fs.delete(new Path(System.getProperty("user.dir") + "/my.csv/"), true);