从大小写为 class 的 JSON 数据生成类型化转换时出错
Error when generating typed transformations from JSON data with case class
我正在尝试为案例 class 人创建强类型数据集。这是我现在的代码:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types._
case class Person(name: String,phone: String,address :Map[String, String])
val schema = ArrayBuffer[StructField]()
schema.appendAll(List(StructField("name", StringType), StructField("phone", StringType)))
schema.append(StructField("address", MapType(StringType, StringType)))
implicit val personEncoder = org.apache.spark.sql.Encoders.kryo[Person]
val sparkConf = new SparkConf().setAppName("dynamic-json-schema").setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val jsonDF = spark.read
.schema(StructType(schema.toList))
.json("""apath\data.json""")
.toDF()
jsonDF.as[Person].select("name", "phone")
这是输入 json 数据:
{"name":"Michael","phone":"2342233","address":{"street":"Lincoln", "number":"344", "postcode":"3245NM"}}
{"name":"Tony","phone":"4342223","address":{"street":"Pizla", "number":"12", "postcode":"9088AL"}}
{"name":"Maria","phone":"32233454","address":{"street":"Coco", "number":"32", "postcode":"8900PO"}}
尽管我收到下一个错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<address:struct<number:string,postcode:string,street:string>,name:string,phone:string> to Tuple1, but failed as the number of fields does not line up.;
我正在使用 spark 2.2.0。
我知道这与嵌套 json 和到 class Person 的映射有关,但是 spark 无法转换 Dataset[Row] -> Dataset[Person] 的确切原因是什么?
如果我移除 Kyro 编码器,它就可以正常工作。
您的数据嵌套不是问题,因为它也适用于 non-nested JSON
import org.apache.spark.sql.SparkSession
case class Address(street: String, number: String, postcode: String)
case class Person(name: String, phone: String, address: Address)
object JsonReader extends App {
val sparkSession = SparkSession.builder()
.master("local")
.getOrCreate()
import sparkSession.implicits._
val p = JsonReader.getClass.getClassLoader.getResource("input.json").toURI.getPath
val df = sparkSession.read.json(p).as[Person]
df.printSchema()
df.show()
df.select($"address.*").show
}
我正在尝试为案例 class 人创建强类型数据集。这是我现在的代码:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types._
case class Person(name: String,phone: String,address :Map[String, String])
val schema = ArrayBuffer[StructField]()
schema.appendAll(List(StructField("name", StringType), StructField("phone", StringType)))
schema.append(StructField("address", MapType(StringType, StringType)))
implicit val personEncoder = org.apache.spark.sql.Encoders.kryo[Person]
val sparkConf = new SparkConf().setAppName("dynamic-json-schema").setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val jsonDF = spark.read
.schema(StructType(schema.toList))
.json("""apath\data.json""")
.toDF()
jsonDF.as[Person].select("name", "phone")
这是输入 json 数据:
{"name":"Michael","phone":"2342233","address":{"street":"Lincoln", "number":"344", "postcode":"3245NM"}}
{"name":"Tony","phone":"4342223","address":{"street":"Pizla", "number":"12", "postcode":"9088AL"}}
{"name":"Maria","phone":"32233454","address":{"street":"Coco", "number":"32", "postcode":"8900PO"}}
尽管我收到下一个错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<address:struct<number:string,postcode:string,street:string>,name:string,phone:string> to Tuple1, but failed as the number of fields does not line up.;
我正在使用 spark 2.2.0。 我知道这与嵌套 json 和到 class Person 的映射有关,但是 spark 无法转换 Dataset[Row] -> Dataset[Person] 的确切原因是什么?
如果我移除 Kyro 编码器,它就可以正常工作。
您的数据嵌套不是问题,因为它也适用于 non-nested JSON
import org.apache.spark.sql.SparkSession
case class Address(street: String, number: String, postcode: String)
case class Person(name: String, phone: String, address: Address)
object JsonReader extends App {
val sparkSession = SparkSession.builder()
.master("local")
.getOrCreate()
import sparkSession.implicits._
val p = JsonReader.getClass.getClassLoader.getResource("input.json").toURI.getPath
val df = sparkSession.read.json(p).as[Person]
df.printSchema()
df.show()
df.select($"address.*").show
}