如何在单个 csv 文件中保存 pyspark 数据框
How to save pyspark data frame in a single csv file
这是 话题的延续。
我正在尝试将我的 pyspark 数据框 df 保存在我的 pyspark 3.0.1 中。所以我写了
df.coalesce(1).write.csv('mypath/df.csv)
但是执行此操作后,我在 mypath 中看到一个名为 df.csv 的文件夹,其中包含以下 4 个文件
1._committed_..
2._started_...
3._Success
4. part-00000-.. .csv
你能建议我如何保存 df.csv
中的所有数据吗?
如果你想得到一个名为df.csv
的文件作为输出,你可以先写入一个临时文件夹,然后移动Spark生成的零件文件并重命名。
这些步骤可以通过 JVM 网关使用 Hadoop FileSystem API 完成:
temp_path = "mypath/__temp"
target_path = "mypath/df.csv"
df.coalesce(1).write.mode("overwrite").csv(temp_path)
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
# get the part file generated by spark write
fs = Path(temp_path).getFileSystem(sc._jsc.hadoopConfiguration())
csv_part_file = fs.globStatus(Path(temp_path + "/part*"))[0].getPath()
# move and rename the file
fs.rename(csv_part_file, Path(target_path))
fs.delete(Path(temp_path), True)
您可以使用 repartition(1)
将文件保存在 1 个 csv 分区中,然后重命名此 csv 并将其移动到所需的文件夹。
这是一个函数:
df
: 你的df
fileName
: 您想要的 csv 文件名称
filePath
: 您要保存到的文件夹
def export_csv(df, fileName, filePath):
filePathDestTemp = filePath + ".dir/"
df\
.repartition(1)\
.write\
.save(filePathDestTemp)
listFiles = dbutils.fs.ls(filePathDestTemp)
for subFiles in listFiles:
if subFiles.name[-4:] == ".csv":
dbutils.fs.cp (filePathDestTemp + subFiles.name, filePath + fileName+ '.csv')
dbutils.fs.rm(filePathDestTemp, recurse=True)
这是
我正在尝试将我的 pyspark 数据框 df 保存在我的 pyspark 3.0.1 中。所以我写了
df.coalesce(1).write.csv('mypath/df.csv)
但是执行此操作后,我在 mypath 中看到一个名为 df.csv 的文件夹,其中包含以下 4 个文件
1._committed_..
2._started_...
3._Success
4. part-00000-.. .csv
你能建议我如何保存 df.csv
中的所有数据吗?
如果你想得到一个名为df.csv
的文件作为输出,你可以先写入一个临时文件夹,然后移动Spark生成的零件文件并重命名。
这些步骤可以通过 JVM 网关使用 Hadoop FileSystem API 完成:
temp_path = "mypath/__temp"
target_path = "mypath/df.csv"
df.coalesce(1).write.mode("overwrite").csv(temp_path)
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
# get the part file generated by spark write
fs = Path(temp_path).getFileSystem(sc._jsc.hadoopConfiguration())
csv_part_file = fs.globStatus(Path(temp_path + "/part*"))[0].getPath()
# move and rename the file
fs.rename(csv_part_file, Path(target_path))
fs.delete(Path(temp_path), True)
您可以使用 repartition(1)
将文件保存在 1 个 csv 分区中,然后重命名此 csv 并将其移动到所需的文件夹。
这是一个函数:
df
: 你的df
fileName
: 您想要的 csv 文件名称
filePath
: 您要保存到的文件夹
def export_csv(df, fileName, filePath):
filePathDestTemp = filePath + ".dir/"
df\
.repartition(1)\
.write\
.save(filePathDestTemp)
listFiles = dbutils.fs.ls(filePathDestTemp)
for subFiles in listFiles:
if subFiles.name[-4:] == ".csv":
dbutils.fs.cp (filePathDestTemp + subFiles.name, filePath + fileName+ '.csv')
dbutils.fs.rm(filePathDestTemp, recurse=True)