使用 Pyspark 将文件从一个目录移动到 HDFS 中的另一个目录

Moving files from one directory to another directory in HDFS using Pyspark

我正在尝试从一个目录中读取所有 JSON 文件的数据,并使用以下代码将它们存储在 Spark Dataframe 中。 (效果很好)

spark = SparkSession.builder.getOrCreate()


df = spark.read.json("hdfs:///user/temp/backup_data/st_in_*/*/*.json",multiLine=True)

但是当我尝试使用以下代码保存包含多个文件的 DataFrame 时

df.write.json("hdfs:///user/another_dir/to_save_dir/")

它没有按预期存储文件并抛出类似 to_save_dir 已经存在的错误

我只想像从源目录读取文件到目标目录那样保存文件。

编辑:

问题是,当我读取多个文件并想将其写入目录时,Pyspark 中的过程是什么?我问这个的原因是因为一旦 spark 加载了所有文件,它就会创建一个数据帧,并且每个文件都是这个数据帧中的一行,我应该如何继续为数据帧中的每一行创建新文件

你得到的错误很明显,你试图写入的位置似乎已经存在。您可以通过指定 mode :

来选择覆盖它
df.write.mode("overwrite").json("hdfs:///user/another_dir/to_save_dir/")

但是,如果您的目的只是将文件从HDFS 中的一个位置移动到另一个位置,则不需要在Spark 中读取文件然后写入它们。相反,请尝试使用 Hadoop FS API:

conf = sc._jsc.hadoopConfiguration()
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileUtil = sc._gateway.jvm.org.apache.hadoop.fs.FileUtil

src_path = Path(src_folder)
dest_path = Path(dest_folder)

FileUtil.copy(src_path.getFileSystem(conf), 
              src_path,
              dest_path.getFileSystem(conf),
              dest_path,
              True,
              conf)