多态的 SPARK 处理 JSON
SPARK processing of polymorphic JSON
考虑这个 JSON 输入(为了便于阅读,以多行形式显示,但实际输入文档是单行 CR 分隔):
{
"common": { "type":"A", "date":"2020-01-01T12:00:00" },
"data": {
"name":"Dave",
"pets": [ "dog", "cat" ]
}
}
{
"common": { "type": "B", "date":"2020-01-01T12:00:00" },
"data": {
"whatever": { "X": {"foo":3}, "Y":"bar" },
"favoriteInts": [ 0, 1, 7]
}
}
我熟悉 json-schema
以及我描述 data
的方式
子结构可以是 name,pets
或 whatever,favoriteInts
。我们
使用 common.type
字段在运行时识别类型。
这在 SPARK 模式定义中可能吗? 初步实验如下:
schema = StructType([
StructField("common", StructType(common_schema)), # .. because the type is consistent
StructField("data", StructType()) # attempting to declare a "generic" struct
])
df = spark.read.option("multiline", "true").json(source, schema)
不起作用;阅读 data
结构包含的任何内容,但在此特定示例 2 字段中,我们得到:
+--------------------+----+
| common|data|
+--------------------+----+
|{2020-01-01T12:00...| {}|
+--------------------+----+
并尝试提取任何命名字段会产生 No such struct field <whatever>
。将“通用结构”从 schema
def 中完全去除会产生一个没有 任何 字段的数据帧 data
,更不用说其中的字段了。
除此之外,我最终寻求做这样的事情:
df = spark.read.json(source)
def processA(frame):
frame.select( frame.data.name ) # we KNOW name exists for type A
...
def processB(frame):
frame.select( frame.data.favoriteInts ) # we KNOW favoriteInts exists for type B
...
processA( df.filter(df.common.type == "A") )
processB( df.filter(df.common.type == "B") )
您可以在结构中使用嵌套和可为 null 的类型(通过指定 True
)以适应不确定性。
from pyspark.sql.types import StructType, StringType, ArrayType, StructField, IntegerType
data_schema = StructType([
# Type A related attributes
StructField("name",StringType(),True), # True implies nullable
StructField("pets",ArrayType(StringType()),True),
# Type B related attributes
StructField("whatever",StructType([
StructField("X",StructType([
StructField("foo",IntegerType(),True)
]),True),
StructField("Y",StringType(),True)
]),True), # True implies nullable
StructField("favoriteInts",ArrayType(IntegerType()),True),
])
schema = StructType([
StructField("common", StructType(common_schema)), # .. because the type is consistent
StructField("data", data_schema)
])
考虑这个 JSON 输入(为了便于阅读,以多行形式显示,但实际输入文档是单行 CR 分隔):
{
"common": { "type":"A", "date":"2020-01-01T12:00:00" },
"data": {
"name":"Dave",
"pets": [ "dog", "cat" ]
}
}
{
"common": { "type": "B", "date":"2020-01-01T12:00:00" },
"data": {
"whatever": { "X": {"foo":3}, "Y":"bar" },
"favoriteInts": [ 0, 1, 7]
}
}
我熟悉 json-schema
以及我描述 data
的方式
子结构可以是 name,pets
或 whatever,favoriteInts
。我们
使用 common.type
字段在运行时识别类型。
这在 SPARK 模式定义中可能吗? 初步实验如下:
schema = StructType([
StructField("common", StructType(common_schema)), # .. because the type is consistent
StructField("data", StructType()) # attempting to declare a "generic" struct
])
df = spark.read.option("multiline", "true").json(source, schema)
不起作用;阅读 data
结构包含的任何内容,但在此特定示例 2 字段中,我们得到:
+--------------------+----+
| common|data|
+--------------------+----+
|{2020-01-01T12:00...| {}|
+--------------------+----+
并尝试提取任何命名字段会产生 No such struct field <whatever>
。将“通用结构”从 schema
def 中完全去除会产生一个没有 任何 字段的数据帧 data
,更不用说其中的字段了。
除此之外,我最终寻求做这样的事情:
df = spark.read.json(source)
def processA(frame):
frame.select( frame.data.name ) # we KNOW name exists for type A
...
def processB(frame):
frame.select( frame.data.favoriteInts ) # we KNOW favoriteInts exists for type B
...
processA( df.filter(df.common.type == "A") )
processB( df.filter(df.common.type == "B") )
您可以在结构中使用嵌套和可为 null 的类型(通过指定 True
)以适应不确定性。
from pyspark.sql.types import StructType, StringType, ArrayType, StructField, IntegerType
data_schema = StructType([
# Type A related attributes
StructField("name",StringType(),True), # True implies nullable
StructField("pets",ArrayType(StringType()),True),
# Type B related attributes
StructField("whatever",StructType([
StructField("X",StructType([
StructField("foo",IntegerType(),True)
]),True),
StructField("Y",StringType(),True)
]),True), # True implies nullable
StructField("favoriteInts",ArrayType(IntegerType()),True),
])
schema = StructType([
StructField("common", StructType(common_schema)), # .. because the type is consistent
StructField("data", data_schema)
])