如何在 Spark 中创建架构文件
How to create a Schema file in Spark
我正在尝试读取架构文件(文本文件)并将其应用到我的 CSV 文件中,而无需 header。因为我已经有一个架构文件,所以我不想使用 InferSchema
选项,这是一个开销。
我的输入模式文件如下所示,
"num IntegerType","letter StringType"
我正在尝试使用以下代码创建模式文件,
val schema_file = spark.read.textFile("D:\Users\Documents\schemaFile.txt")
val struct_type = schema_file.flatMap(x => x.split(",")).map(b => (b.split(" ")(0).stripPrefix("\"").asInstanceOf[String],b.split(" ")(1).stripSuffix("\"").asInstanceOf[org.apache.spark.sql.types.DataType])).foreach(x=>println(x))
我收到如下错误
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.DataType
- 字段 (class: "org.apache.spark.sql.types.DataType", 名称: "_2")
- 根 class: "scala.Tuple2"
并尝试将其用作架构文件,同时使用如下所示的 spark.read.csv
并将其写入 ORC 文件
val df=spark.read
.format("org.apache.spark.csv")
.option("header", false)
.option("inferSchema", true)
.option("samplingRatio",0.01)
.option("nullValue", "NULL")
.option("delimiter","|")
.schema(schema_file)
.csv("D:\Users\sampleFile.txt")
.toDF().write.format("orc").save("D:\Users\ORC")
需要帮助将文本文件转换为模式文件并将我的输入 CSV 文件转换为 ORC。
要从 text
文件创建模式,请创建一个函数以 match
type
和 return DataType
作为
def getType(raw: String): DataType = {
raw match {
case "ByteType" => ByteType
case "ShortType" => ShortType
case "IntegerType" => IntegerType
case "LongType" => LongType
case "FloatType" => FloatType
case "DoubleType" => DoubleType
case "BooleanType" => BooleanType
case "TimestampType" => TimestampType
case _ => StringType
}
}
现在通过读取架构文件来创建架构
val schema = Source.fromFile("schema.txt").getLines().toList
.flatMap(_.split(",")).map(_.replaceAll("\"", "").split(" "))
.map(x => StructField(x(0), getType(x(1)), true))
现在将 csv 文件读取为
spark.read
.option("samplingRatio", "0.01")
.option("delimiter", "|")
.option("nullValue", "NULL")
.schema(StructType(schema))
.csv("data.csv")
希望对您有所帮助!
像这样的东西更健壮一些,因为它使用了配置单元元存储:
import org.apache.hadoop.hive.metastore.api.FieldSchema
def sparkToHiveSchema(schema: StructType): List[FieldSchema] ={
schema.map(field => new FieldSchema(field.name,field.dataType.catalogString,field.getComment.getOrElse(""))).toList
}
``
您可以像这样指定架构:
import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType};
例如:
val schema = new StructType(
Array(
StructField("Age",IntegerType,true),
StructField("Name",StringType,true),
)
)
val data = spark.read.option("header", "false").schema(schema).csv("filename.csv")
data.show()
这将直接在数据框中创建它
您可以按以下格式创建名为 schema.json
的 JSON 文件
{
"fields": [
{
"metadata": {},
"name": "first_fields",
"nullable": true,
"type": "string"
},
{
"metadata": {},
"name": "double_field",
"nullable": true,
"type": "double"
}
],
"type": "struct"
}
通过读取此文件创建结构模式
rdd = spark.sparkContext.wholeTextFiles("s3://<bucket>/schema.json")
text = rdd.collect()[0][1]
dict = json.loads(str(text))
custom_schema = StructType.fromJson(dict)
之后可以使用struct作为schema读取csv文件
val df=spark.read
.format("org.apache.spark.csv")
.option("header", false)
.option("inferSchema", true)
.option("samplingRatio",0.01)
.option("nullValue", "NULL")
.option("delimiter","|")
.schema(custom_schema)
.csv("D:\Users\sampleFile.txt")
.toDF().write.format("orc").save("D:\Users\ORC")
我正在尝试读取架构文件(文本文件)并将其应用到我的 CSV 文件中,而无需 header。因为我已经有一个架构文件,所以我不想使用 InferSchema
选项,这是一个开销。
我的输入模式文件如下所示,
"num IntegerType","letter StringType"
我正在尝试使用以下代码创建模式文件,
val schema_file = spark.read.textFile("D:\Users\Documents\schemaFile.txt")
val struct_type = schema_file.flatMap(x => x.split(",")).map(b => (b.split(" ")(0).stripPrefix("\"").asInstanceOf[String],b.split(" ")(1).stripSuffix("\"").asInstanceOf[org.apache.spark.sql.types.DataType])).foreach(x=>println(x))
我收到如下错误
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.DataType
- 字段 (class: "org.apache.spark.sql.types.DataType", 名称: "_2") - 根 class: "scala.Tuple2"
并尝试将其用作架构文件,同时使用如下所示的 spark.read.csv
并将其写入 ORC 文件
val df=spark.read
.format("org.apache.spark.csv")
.option("header", false)
.option("inferSchema", true)
.option("samplingRatio",0.01)
.option("nullValue", "NULL")
.option("delimiter","|")
.schema(schema_file)
.csv("D:\Users\sampleFile.txt")
.toDF().write.format("orc").save("D:\Users\ORC")
需要帮助将文本文件转换为模式文件并将我的输入 CSV 文件转换为 ORC。
要从 text
文件创建模式,请创建一个函数以 match
type
和 return DataType
作为
def getType(raw: String): DataType = {
raw match {
case "ByteType" => ByteType
case "ShortType" => ShortType
case "IntegerType" => IntegerType
case "LongType" => LongType
case "FloatType" => FloatType
case "DoubleType" => DoubleType
case "BooleanType" => BooleanType
case "TimestampType" => TimestampType
case _ => StringType
}
}
现在通过读取架构文件来创建架构
val schema = Source.fromFile("schema.txt").getLines().toList
.flatMap(_.split(",")).map(_.replaceAll("\"", "").split(" "))
.map(x => StructField(x(0), getType(x(1)), true))
现在将 csv 文件读取为
spark.read
.option("samplingRatio", "0.01")
.option("delimiter", "|")
.option("nullValue", "NULL")
.schema(StructType(schema))
.csv("data.csv")
希望对您有所帮助!
像这样的东西更健壮一些,因为它使用了配置单元元存储:
import org.apache.hadoop.hive.metastore.api.FieldSchema
def sparkToHiveSchema(schema: StructType): List[FieldSchema] ={
schema.map(field => new FieldSchema(field.name,field.dataType.catalogString,field.getComment.getOrElse(""))).toList
}
``
您可以像这样指定架构:
import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType};
例如:
val schema = new StructType(
Array(
StructField("Age",IntegerType,true),
StructField("Name",StringType,true),
)
)
val data = spark.read.option("header", "false").schema(schema).csv("filename.csv")
data.show()
这将直接在数据框中创建它
您可以按以下格式创建名为 schema.json
的 JSON 文件
{
"fields": [
{
"metadata": {},
"name": "first_fields",
"nullable": true,
"type": "string"
},
{
"metadata": {},
"name": "double_field",
"nullable": true,
"type": "double"
}
],
"type": "struct"
}
通过读取此文件创建结构模式
rdd = spark.sparkContext.wholeTextFiles("s3://<bucket>/schema.json")
text = rdd.collect()[0][1]
dict = json.loads(str(text))
custom_schema = StructType.fromJson(dict)
之后可以使用struct作为schema读取csv文件
val df=spark.read
.format("org.apache.spark.csv")
.option("header", false)
.option("inferSchema", true)
.option("samplingRatio",0.01)
.option("nullValue", "NULL")
.option("delimiter","|")
.schema(custom_schema)
.csv("D:\Users\sampleFile.txt")
.toDF().write.format("orc").save("D:\Users\ORC")