如何将数据从 Spark SQL 导出到 CSV
How to export data from Spark SQL to CSV
此命令适用于 HiveQL:
insert overwrite directory '/data/home.csv' select * from testtable;
但是使用 Spark SQL 我收到一个错误 org.apache.spark.sql.hive.HiveQl
堆栈跟踪:
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory '/data/home.csv' select * from testtable
请指导我在 Spark 中编写导出到 CSV 功能 SQL。
错误消息表明这不是查询语言支持的功能。但是您可以像往常一样通过 RDD 接口(df.rdd.saveAsTextFile
)以任何格式保存 DataFrame。或者您可以查看 https://github.com/databricks/spark-csv.
您可以使用下面的语句将dataframe的内容写成CSV格式
df.write.csv("/data/home/csv")
如果您需要将整个数据帧写入单个 CSV 文件,请使用
df.coalesce(1).write.csv("/data/home/sample.csv")
对于spark 1.x,你可以使用spark-csv将结果写入CSV文件
下面的 scala 片段会有所帮助
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")
将内容写入单个文件
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
最简单的方法是映射DataFrame的RDD并使用mkString:
df.rdd.map(x=>x.mkString(","))
从 Spark 1.5 开始(甚至更早)
df.map(r=>r.mkString(","))
也会这样做
如果你想要 CSV 转义,你可以使用 apache commons lang。例如这是我们使用的代码
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) = {
def trimColumnLength(c: String) = {
val col = maxColumnLength match {
case None => c
case Some(len: Int) => c.take(len)
}
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
}
def rowToString(r: Row) = {
val st = r.mkString("~-~").replaceAll("[\p{C}|\uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
}
def addHeader(r: RDD[String]) = {
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
}
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
}
上面使用 spark-csv 的答案是正确的,但存在一个问题 - 该库会根据数据帧分区创建多个文件。而这不是我们通常需要的。因此,您可以将所有分区合并为一个:
df.coalesce(1).
write.
format("com.databricks.spark.csv").
option("header", "true").
save("myfile.csv")
并将 lib (name "part-00000") 的输出重命名为所需的文件名。
此博客 post 提供了更多详细信息:https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/
由于 Spark 2.X
spark-csv
集成为 native datasource。因此,必要的语句简化为 (windows)
df.write
.option("header", "true")
.csv("file:///C:/out.csv")
或 UNIX
df.write
.option("header", "true")
.csv("/var/out.csv")
注意:正如评论所说,它正在创建包含分区名称的目录,而不是 standard CSV file。然而,这很可能是您想要的,否则您要么使您的驱动程序崩溃(内存不足),要么您可能正在使用非分布式环境。
借助 spark-csv,我们可以写入 CSV 文件。
val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
在 DATAFRAME 中输入代码:
val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")
此命令适用于 HiveQL:
insert overwrite directory '/data/home.csv' select * from testtable;
但是使用 Spark SQL 我收到一个错误 org.apache.spark.sql.hive.HiveQl
堆栈跟踪:
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory '/data/home.csv' select * from testtable
请指导我在 Spark 中编写导出到 CSV 功能 SQL。
错误消息表明这不是查询语言支持的功能。但是您可以像往常一样通过 RDD 接口(df.rdd.saveAsTextFile
)以任何格式保存 DataFrame。或者您可以查看 https://github.com/databricks/spark-csv.
您可以使用下面的语句将dataframe的内容写成CSV格式
df.write.csv("/data/home/csv")
如果您需要将整个数据帧写入单个 CSV 文件,请使用
df.coalesce(1).write.csv("/data/home/sample.csv")
对于spark 1.x,你可以使用spark-csv将结果写入CSV文件
下面的 scala 片段会有所帮助
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")
将内容写入单个文件
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
最简单的方法是映射DataFrame的RDD并使用mkString:
df.rdd.map(x=>x.mkString(","))
从 Spark 1.5 开始(甚至更早)
df.map(r=>r.mkString(","))
也会这样做
如果你想要 CSV 转义,你可以使用 apache commons lang。例如这是我们使用的代码
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) = {
def trimColumnLength(c: String) = {
val col = maxColumnLength match {
case None => c
case Some(len: Int) => c.take(len)
}
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
}
def rowToString(r: Row) = {
val st = r.mkString("~-~").replaceAll("[\p{C}|\uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
}
def addHeader(r: RDD[String]) = {
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
}
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
}
上面使用 spark-csv 的答案是正确的,但存在一个问题 - 该库会根据数据帧分区创建多个文件。而这不是我们通常需要的。因此,您可以将所有分区合并为一个:
df.coalesce(1).
write.
format("com.databricks.spark.csv").
option("header", "true").
save("myfile.csv")
并将 lib (name "part-00000") 的输出重命名为所需的文件名。
此博客 post 提供了更多详细信息:https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/
由于 Spark 2.X
spark-csv
集成为 native datasource。因此,必要的语句简化为 (windows)
df.write
.option("header", "true")
.csv("file:///C:/out.csv")
或 UNIX
df.write
.option("header", "true")
.csv("/var/out.csv")
注意:正如评论所说,它正在创建包含分区名称的目录,而不是 standard CSV file。然而,这很可能是您想要的,否则您要么使您的驱动程序崩溃(内存不足),要么您可能正在使用非分布式环境。
借助 spark-csv,我们可以写入 CSV 文件。
val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
在 DATAFRAME 中输入代码:
val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")