如何使用 Spark Streaming 中的对象列表解析 json 模式?
How to parse json schema with list of objects in spark streaming?
所以我有如下数据流:
{
"messageDetails":{
"id": "1",
"name": "2"
},
"messageMain":{
"date": "string",
"details": [{"val1":"abcd","val2":"efgh"},{"val1":"aaaa","val2":"bbbb"}]
}
这是一条消息示例。通常,我会定义如下模式:
val tableSchema: StructType = (new StructType)
.add("messageDetails", (new StructType)
.add("id", StringType)
.add("name", StringType))
.add("messageMain", (new StructType)
.add("date", StringType)
.add("details", ???) ????)
然后像这样阅读消息-
val df = spark.read.schema(tableSchema).json(rdd)
但是,我不确定如何定义 details
,因为它是对象列表而不是结构类型。如果有另一种方法,我不想简单地分解行.. 因为这样做的最终目标是写回 google BigQuery table 将 details
设置为 repeated record type
.
你想要 ArrayType
的 StructType
持有 val1
和 val2
StringType
的
例如
val itemSchema = (new StructType)
.add("val1", StringType)
.add("val2", StringType)
val detailsSchema = new ArrayType(itemSchema, false)
val tableSchema: StructType = (new StructType)
.add("messageDetails", (new StructType)
.add("id", StringType)
.add("name", StringType))
.add("messageMain", (new StructType)
.add("date", StringType)
.add("details", detailsSchema))
所以我有如下数据流:
{
"messageDetails":{
"id": "1",
"name": "2"
},
"messageMain":{
"date": "string",
"details": [{"val1":"abcd","val2":"efgh"},{"val1":"aaaa","val2":"bbbb"}]
}
这是一条消息示例。通常,我会定义如下模式:
val tableSchema: StructType = (new StructType)
.add("messageDetails", (new StructType)
.add("id", StringType)
.add("name", StringType))
.add("messageMain", (new StructType)
.add("date", StringType)
.add("details", ???) ????)
然后像这样阅读消息-
val df = spark.read.schema(tableSchema).json(rdd)
但是,我不确定如何定义 details
,因为它是对象列表而不是结构类型。如果有另一种方法,我不想简单地分解行.. 因为这样做的最终目标是写回 google BigQuery table 将 details
设置为 repeated record type
.
你想要 ArrayType
的 StructType
持有 val1
和 val2
StringType
的
例如
val itemSchema = (new StructType)
.add("val1", StringType)
.add("val2", StringType)
val detailsSchema = new ArrayType(itemSchema, false)
val tableSchema: StructType = (new StructType)
.add("messageDetails", (new StructType)
.add("id", StringType)
.add("name", StringType))
.add("messageMain", (new StructType)
.add("date", StringType)
.add("details", detailsSchema))