如何从 Spark 读写 DataFrame

How to read and write DataFrame from Spark

我需要将 DataFrame 保存为 CSV 或 parquet 格式(作为单个文件),然后再次打开它。数据量不会超过60Mb,所以单个文件是合理的方案。这个简单的任务让我很头疼......这是我试过的:

读取文件(如果存在):

df = sqlContext
               .read.parquet("s3n://bucket/myTest.parquet")
               .toDF("key", "value", "date", "qty")

写入文件:

df.write.parquet("s3n://bucket/myTest.parquet")

这不起作用,因为:

1) write 使用 hadoopish 文件创建了文件夹 myTest.parquet,后来我无法用 .read.parquet("s3n://bucket/myTest.parquet") 读取这些文件。事实上,我不关心多个 hadoopish 文件,除非我以后可以轻松地将它们读入 DataFrame。可能吗?

2) 我一直在使用我在 S3 中更新和覆盖的同一个文件 myTest.parquet。它告诉我该文件无法保存,因为它已经存在。

那么,有人可以告诉我执行 read/write 循环的正确方法吗?文件格式对我来说并不重要(csv、parquet、csv、hadoopish 文件),除非我可以进行读写循环。

您可以使用 saveAsTable("TableName") 保存您的 DataFrame 并使用 table("TableName") 读取它。位置可以通过spark.sql.warehouse.dir设置。您可以用 mode(SaveMode.Ignore) 覆盖文件。你可以read这里更多来自官方文档。

在 Java 中看起来像这样:

SparkSession spark = ...
spark.conf().set("spark.sql.warehouse.dir", "hdfs://localhost:9000/tables");
Dataset<Row> data = ...
data.write().mode(SaveMode.Overwrite).saveAsTable("TableName");

现在您可以通过以下方式读取数据:

spark.read().table("TableName");