使用 sparksql 将 json 文件数据插入 sql table

insert json file data to sql table using sparksql

我正在尝试使用 sparksql 将 json 文件数据插入 sql table 我的示例 json 文件示例:

{
    "id": "value_string",
    "aggregate_id": "value_string",
    "type": "value_string",
    "timestamp": "value_string",
    "data": {
        "customer_id": "value_string",
        "name": "value_string"
    }
}

想使用 spark 在 sql table 中插入,尝试如图所示创建但不能

     public class DataOfPerson
        {
            public string name { get; set; }
            public string birthdate { get; set; }
            public string customer_id { get; set; }

        }
        public class Person
        {
            public string id { get; set; }
            public string aggregate_id { get; set; }
            public string type { get; set; }
            public string timestamp { get; set; }  
            public List<DataOfPerson> dataOfPerson { get; set; }
        }
        public class RootObject
        {
            public Person person { get; set; }
        }
        var root = JsonConvert.DeserializeObject<RootObject> (sqlContext.read.json(s"abfss://abc@xyz/events.json")

def flattenDataFrame(spark: SparkSession, nestedDf: DataFrame): DataFrame = {

var flatCols = Array.empty[String]
var nestedCols = Array.empty[String]
var flatDF = spark.emptyDataFrame
for (w <- nestedDf.dtypes) {
  if (w._2.contains("Struct")) {
    nestedCols = nestedCols.:+(w._1)
  } else {
    flatCols = flatCols.:+(w._1)
  }
}

var nestedCol = Array.empty[String]
for (nc <- nestedCols) {
  for (c <- nestedDf.select(nc + ".*").columns) {
    nestedCol = nestedCol.:+(nc + "." + c)
  }
}
val allColumns = flatCols ++ nestedCol
val colNames = allColumns.map(name => col(name))
nestedDf.select(colNames: _*)

}