在 Spark 2.2 中读取数组字段
Reading array fields in Spark 2.2
假设您有一堆数据,其行如下所示:
{
'key': [
{'key1': 'value11', 'key2': 'value21'},
{'key1': 'value12', 'key2': 'value22'}
]
}
我想将其读入 Spark Dataset
。一种方法如下:
case class ObjOfLists(k1: List[String], k2: List[String])
case class Data(k: ObjOfLists)
那么你可以这样做:
sparkSession.read.json(pathToData).select(
struct($"key.key1" as "k1", $"key.key2" as "k2") as "k"
)
.as[Data]
这工作正常,但有点破坏了数据;毕竟在数据中 'key'
指向对象列表而不是列表对象。也就是说,我真正想要的是:
case class Obj(k1: String, k2: String)
case class DataOfList(k: List[Obj])
我的问题:我是否可以在 select
中放入一些其他语法,从而允许将生成的 Dataframe
转换为 Dataset[DataOfList]
?
我尝试使用与上面相同的 select
语法,得到:
Exception in thread "main" org.apache.spark.sql.AnalysisException: need an array field but got struct<k1:array<string>,k2:array<string>>;
所以我也试过了:
sparkSession.read.json(pathToData).select(
array(struct($"key.key1" as "k1", $"key.key2" as "k2")) as "k"
)
.as[DataOfList]
编译并运行,但数据如下所示:
DataOfList(List(Obj(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@bb2a5516,org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@bec5e4a7)))
还有其他想法吗?
只需重铸数据以反映预期名称:
case class Obj(k1: String, k2: String)
case class DataOfList(k: Seq[Obj])
val text = Seq("""{
"key": [
{"key1": "value11", "key2": "value21"},
{"key1": "value12", "key2": "value22"}
]
}""").toDS
val df = spark.read.json(text)
df
.select($"key".cast("array<struct<k1:string,k2:string>>").as("k"))
.as[DataOfList]
.first
DataOfList(List(Obj(value11,value21), Obj(value12,value22)))
对于无关对象,您在读取时定义模式:
val textExtended = Seq("""{
"key": [
{"key0": "value01", "key1": "value11", "key2": "value21"},
{"key1": "value12", "key2": "value22", "key3": "value32"}
]
}""").toDS
val schemaSubset = StructType(Seq(StructField("key", ArrayType(StructType(Seq(
StructField("key1", StringType),
StructField("key2", StringType))))
)))
val df = spark.read.schema(schemaSubset).json(textExtended)
像以前一样进行。
假设您有一堆数据,其行如下所示:
{
'key': [
{'key1': 'value11', 'key2': 'value21'},
{'key1': 'value12', 'key2': 'value22'}
]
}
我想将其读入 Spark Dataset
。一种方法如下:
case class ObjOfLists(k1: List[String], k2: List[String])
case class Data(k: ObjOfLists)
那么你可以这样做:
sparkSession.read.json(pathToData).select(
struct($"key.key1" as "k1", $"key.key2" as "k2") as "k"
)
.as[Data]
这工作正常,但有点破坏了数据;毕竟在数据中 'key'
指向对象列表而不是列表对象。也就是说,我真正想要的是:
case class Obj(k1: String, k2: String)
case class DataOfList(k: List[Obj])
我的问题:我是否可以在 select
中放入一些其他语法,从而允许将生成的 Dataframe
转换为 Dataset[DataOfList]
?
我尝试使用与上面相同的 select
语法,得到:
Exception in thread "main" org.apache.spark.sql.AnalysisException: need an array field but got
struct<k1:array<string>,k2:array<string>>;
所以我也试过了:
sparkSession.read.json(pathToData).select(
array(struct($"key.key1" as "k1", $"key.key2" as "k2")) as "k"
)
.as[DataOfList]
编译并运行,但数据如下所示:
DataOfList(List(Obj(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@bb2a5516,org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@bec5e4a7)))
还有其他想法吗?
只需重铸数据以反映预期名称:
case class Obj(k1: String, k2: String)
case class DataOfList(k: Seq[Obj])
val text = Seq("""{
"key": [
{"key1": "value11", "key2": "value21"},
{"key1": "value12", "key2": "value22"}
]
}""").toDS
val df = spark.read.json(text)
df
.select($"key".cast("array<struct<k1:string,k2:string>>").as("k"))
.as[DataOfList]
.first
DataOfList(List(Obj(value11,value21), Obj(value12,value22)))
对于无关对象,您在读取时定义模式:
val textExtended = Seq("""{
"key": [
{"key0": "value01", "key1": "value11", "key2": "value21"},
{"key1": "value12", "key2": "value22", "key3": "value32"}
]
}""").toDS
val schemaSubset = StructType(Seq(StructField("key", ArrayType(StructType(Seq(
StructField("key1", StringType),
StructField("key2", StringType))))
)))
val df = spark.read.schema(schemaSubset).json(textExtended)
像以前一样进行。