如何将 PySpark 中的 table 数据框导出到 csv?

How to export a table dataframe in PySpark to csv?

我正在使用 Spark 1.3.1 (PySpark),并且我使用 SQL 查询生成了 table。我现在有一个 DataFrame 对象。我想将此 DataFrame 对象(我将其命名为 "table")导出到一个 csv 文件,以便我可以对其进行操作并绘制列。如何将 DataFrame "table" 导出到 csv 文件?

谢谢!

如果数据框适合驱动程序内存并且您想保存到本地文件系统,您可以转换 Spark DataFrame to local Pandas DataFrame using toPandas 方法然后简单地使用 to_csv:

df.toPandas().to_csv('mycsv.csv')

否则你可以使用spark-csv:

  • Spark 1.3

    df.save('mycsv.csv', 'com.databricks.spark.csv')
    
  • Spark 1.4+

    df.write.format('com.databricks.spark.csv').save('mycsv.csv')
    

在Spark 2.0+中可以直接使用csv数据源:

df.write.csv('mycsv.csv')

如果你不能使用spark-csv,你可以这样做:

df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")

如果您需要处理带有换行符或逗号的字符串,这将不起作用。使用这个:

import csv
import cStringIO

def row2csv(row):
    buffer = cStringIO.StringIO()
    writer = csv.writer(buffer)
    writer.writerow([str(s).encode("utf-8") for s in row])
    buffer.seek(0)
    return buffer.read().strip()

df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")

这个怎么样(万一你不想要一个班轮)?

for row in df.collect():
    d = row.asDict()
    s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
    f.write(s)

f 是一个打开的文件描述符。分隔符也是 TAB 字符,但很容易更改为您想要的任何内容。

对于 Apache Spark 2+,为了将数据帧保存到单个 csv 文件中。使用以下命令

query.repartition(1).write.csv("cc_out.csv", sep='|')

这里1表示我只需要一个csv分区。你可以根据你的要求改变它。

您需要将 Dataframe 重新分区到单个分区中,然后以 Unix 文件系统格式定义文件的格式、路径和其他参数,就这样,

df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')

阅读更多关于 repartition function 详细了解 save function

然而,重新分区是一个代价高昂的函数,而 toPandas() 是最糟糕的。尝试在以前的语法中使用 .coalesce(1) 而不是 .repartition(1) 以获得更好的性能。

上阅读更多内容。

'''
I am late to the pary but: this will let me rename the file, move it to a desired directory and delete the unwanted additional directory spark made
'''

import shutil
import os
import glob

path = 'test_write'
#write single csv
students.repartition(1).write.csv(path)

#rename and relocate the csv
shutil.move(glob.glob(os.getcwd() + '\' + path + '\' + r'*.csv')[0], os.getcwd()+ '\' + path+ '.csv')

#remove additional directory
shutil.rmtree(os.getcwd()+'\'+path)

尝试显示 (df) 并使用结果中的下载选项。请注意:使用此选项只能下载 100 万行,但速度非常快。

使用 PySpark

在 Spark 3.0+ 中写入 csv 的最简单方法

sdf.write.csv("/path/to/csv/data.csv")

这可以根据您使用的 spark 节点数生成多个文件。如果您想在单个文件中获取它,请使用重新分区。

sdf.repartition(1).write.csv("/path/to/csv/data.csv")

使用Pandas

如果你的数据量不是很大,可以放在本地python,那你也可以利用pandas

sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)

使用考拉

sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)

我使用了 pandas 的方法,这给了我可怕的表现。最后拖了这么久,我停下来寻找另一种方法。

如果您正在寻找一种写入一个 csv 而不是多个 csv 的方法,这就是您要找的:

df.coalesce(1).write.csv("train_dataset_processed", header=True)

它将处理我的数据集的时间从 2 小时以上减少到 2 分钟