以编程方式为 Apache Spark 中的数据框生成模式和数据
Programmatically generate the schema AND the data for a dataframe in Apache Spark
我想动态生成一个包含报告头记录的数据框,因此根据以下字符串的值创建一个数据框:
val headerDescs : String = "Name,Age,Location"
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
但是现在我想对数据做同样的事情(实际上是相同的数据,即元数据)。
我创建了一个 RDD :
val headerRDD = sc.parallelize(headerDescs.split(","))
然后我打算使用 createDataFrame 来创建它:
val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
但是失败了,因为 createDataframe
期望 RDD[Row]
,但是我的 RDD 是一个字符串数组 - 我找不到将我的 RDD 转换为 Row RDD 然后映射的方法字段动态。我见过的示例假设您事先知道列数,但是我希望最终能够在不更改代码的情况下更改列 - 例如将列放在文件中。
基于第一个答案的代码摘录:
val headerDescs : String = "Name,Age,Location"
// create the schema from a string, splitting by delimiter
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
// create a row from a string, splitting by delimiter
val headerRDDRows = sc.parallelize(headerDescs.split(",")).map( a => Row(a))
val headerDf = sqlContext.createDataFrame(headerRDDRows, headerSchema)
headerDf.show()
执行此结果:
+--------+---+--------+
| Name|Age|Location|
+--------+---+--------+
| Name|
| Age|
|Location|
+--------+---+-------
要将 RDD[Array[String]]
转换为 RDD[Row]
,您需要执行以下步骤:
import org.apache.spark.sql.Row
val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
scala> val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
headerSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,StringType,true), StructField(Location,StringType,true))
scala> val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
headerRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:34
scala> val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
headerDf: org.apache.spark.sql.DataFrame = [Name: string, Age: string, Location: string]
scala> headerDf.printSchema
root
|-- Name: string (nullable = true)
|-- Age: string (nullable = true)
|-- Location: string (nullable = true)
scala> headerDf.show
+----+---+--------+
|Name|Age|Location|
+----+---+--------+
|Name|Age|Location|
+----+---+--------+
这会给你 RDD[Row]
For reading through file
val vRDD = sc.textFile("..**filepath**.").map(_.split(",")).map(a => Row.fromSeq(a))
val headerDf = sqlContext.createDataFrame(vRDD , headerSchema)
Using Spark-CSV package :
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.schema(headerSchema) // defining based on the custom schema
.load("cars.csv")
或
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("cars.csv")
您还可以在 documentation 中探索各种选项。
我想动态生成一个包含报告头记录的数据框,因此根据以下字符串的值创建一个数据框:
val headerDescs : String = "Name,Age,Location"
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
但是现在我想对数据做同样的事情(实际上是相同的数据,即元数据)。
我创建了一个 RDD :
val headerRDD = sc.parallelize(headerDescs.split(","))
然后我打算使用 createDataFrame 来创建它:
val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
但是失败了,因为 createDataframe
期望 RDD[Row]
,但是我的 RDD 是一个字符串数组 - 我找不到将我的 RDD 转换为 Row RDD 然后映射的方法字段动态。我见过的示例假设您事先知道列数,但是我希望最终能够在不更改代码的情况下更改列 - 例如将列放在文件中。
基于第一个答案的代码摘录:
val headerDescs : String = "Name,Age,Location"
// create the schema from a string, splitting by delimiter
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
// create a row from a string, splitting by delimiter
val headerRDDRows = sc.parallelize(headerDescs.split(",")).map( a => Row(a))
val headerDf = sqlContext.createDataFrame(headerRDDRows, headerSchema)
headerDf.show()
执行此结果:
+--------+---+--------+
| Name|Age|Location|
+--------+---+--------+
| Name|
| Age|
|Location|
+--------+---+-------
要将 RDD[Array[String]]
转换为 RDD[Row]
,您需要执行以下步骤:
import org.apache.spark.sql.Row
val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
scala> val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
headerSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,StringType,true), StructField(Location,StringType,true))
scala> val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
headerRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:34
scala> val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
headerDf: org.apache.spark.sql.DataFrame = [Name: string, Age: string, Location: string]
scala> headerDf.printSchema
root
|-- Name: string (nullable = true)
|-- Age: string (nullable = true)
|-- Location: string (nullable = true)
scala> headerDf.show
+----+---+--------+
|Name|Age|Location|
+----+---+--------+
|Name|Age|Location|
+----+---+--------+
这会给你 RDD[Row]
For reading through file
val vRDD = sc.textFile("..**filepath**.").map(_.split(",")).map(a => Row.fromSeq(a))
val headerDf = sqlContext.createDataFrame(vRDD , headerSchema)
Using Spark-CSV package :
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.schema(headerSchema) // defining based on the custom schema
.load("cars.csv")
或
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("cars.csv")
您还可以在 documentation 中探索各种选项。