使用 sparksql 将 json 文件数据插入 sql table
insert json file data to sql table using sparksql
我正在尝试使用 sparksql 将 json 文件数据插入 sql table
我的示例 json 文件示例:
{
"id": "value_string",
"aggregate_id": "value_string",
"type": "value_string",
"timestamp": "value_string",
"data": {
"customer_id": "value_string",
"name": "value_string"
}
}
想使用 spark 在 sql table 中插入,尝试如图所示创建但不能
public class DataOfPerson
{
public string name { get; set; }
public string birthdate { get; set; }
public string customer_id { get; set; }
}
public class Person
{
public string id { get; set; }
public string aggregate_id { get; set; }
public string type { get; set; }
public string timestamp { get; set; }
public List<DataOfPerson> dataOfPerson { get; set; }
}
public class RootObject
{
public Person person { get; set; }
}
var root = JsonConvert.DeserializeObject<RootObject> (sqlContext.read.json(s"abfss://abc@xyz/events.json")
def flattenDataFrame(spark: SparkSession, nestedDf: DataFrame): DataFrame = {
var flatCols = Array.empty[String]
var nestedCols = Array.empty[String]
var flatDF = spark.emptyDataFrame
for (w <- nestedDf.dtypes) {
if (w._2.contains("Struct")) {
nestedCols = nestedCols.:+(w._1)
} else {
flatCols = flatCols.:+(w._1)
}
}
var nestedCol = Array.empty[String]
for (nc <- nestedCols) {
for (c <- nestedDf.select(nc + ".*").columns) {
nestedCol = nestedCol.:+(nc + "." + c)
}
}
val allColumns = flatCols ++ nestedCol
val colNames = allColumns.map(name => col(name))
nestedDf.select(colNames: _*)
}
我正在尝试使用 sparksql 将 json 文件数据插入 sql table 我的示例 json 文件示例:
{
"id": "value_string",
"aggregate_id": "value_string",
"type": "value_string",
"timestamp": "value_string",
"data": {
"customer_id": "value_string",
"name": "value_string"
}
}
想使用 spark 在 sql table 中插入,尝试如图所示创建但不能
public class DataOfPerson
{
public string name { get; set; }
public string birthdate { get; set; }
public string customer_id { get; set; }
}
public class Person
{
public string id { get; set; }
public string aggregate_id { get; set; }
public string type { get; set; }
public string timestamp { get; set; }
public List<DataOfPerson> dataOfPerson { get; set; }
}
public class RootObject
{
public Person person { get; set; }
}
var root = JsonConvert.DeserializeObject<RootObject> (sqlContext.read.json(s"abfss://abc@xyz/events.json")
def flattenDataFrame(spark: SparkSession, nestedDf: DataFrame): DataFrame = {
var flatCols = Array.empty[String]
var nestedCols = Array.empty[String]
var flatDF = spark.emptyDataFrame
for (w <- nestedDf.dtypes) {
if (w._2.contains("Struct")) {
nestedCols = nestedCols.:+(w._1)
} else {
flatCols = flatCols.:+(w._1)
}
}
var nestedCol = Array.empty[String]
for (nc <- nestedCols) {
for (c <- nestedDf.select(nc + ".*").columns) {
nestedCol = nestedCol.:+(nc + "." + c)
}
}
val allColumns = flatCols ++ nestedCol
val colNames = allColumns.map(name => col(name))
nestedDf.select(colNames: _*)
}