如何使用 pyspark explode() 分解结构
How to explode structs with pyspark explode()
如何将下面的 JSON 转换成它后面的关系行?我坚持的部分是 pyspark explode()
函数由于类型不匹配而抛出异常。我还没有找到一种方法来将数据强制转换为合适的格式,以便我可以在 sample_json
对象中的 source
键中的每个对象中创建行。
JSON 输入
sample_json = """
{
"dc_id": "dc-101",
"source": {
"sensor-igauge": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp":35,
"c02_level": 1475,
"geo": {"lat":38.00, "long":97.00}
},
"sensor-ipad": {
"id": 13,
"ip": "67.185.72.1",
"description": "Sensor ipad attached to carbon cylinders",
"temp": 34,
"c02_level": 1370,
"geo": {"lat":47.41, "long":-122.00}
},
"sensor-inest": {
"id": 8,
"ip": "208.109.163.218",
"description": "Sensor attached to the factory ceilings",
"temp": 40,
"c02_level": 1346,
"geo": {"lat":33.61, "long":-111.89}
},
"sensor-istick": {
"id": 5,
"ip": "204.116.105.67",
"description": "Sensor embedded in exhaust pipes in the ceilings",
"temp": 40,
"c02_level": 1574,
"geo": {"lat":35.93, "long":-85.46}
}
}
}"""
期望的输出
dc_id source_name id description
-------------------------------------------------------------------------------
dc-101 sensor-gauge 10 Sensor attached to the container ceilings
dc-101 sensor-ipad 13 Sensor ipad attached to carbon cylinders
dc-101 sensor-inest 8 Sensor attached to the factory ceilings
dc-101 sensor-istick 5 Sensor embedded in exhaust pipes in the ceilings
PYSPARK 代码
from pyspark.sql.functions import *
df_sample_data = spark.read.json(sc.parallelize([sample_json]))
df_expanded = df_sample_data.withColumn("one_source",explode_outer(col("source")))
display(df_expanded)
错误
AnalysisException: cannot resolve 'explode(source
)' due to data type
mismatch: input to function explode should be array or map type, not
struct....
我把这个 Databricks notebook 放在一起是为了进一步演示挑战并清楚地显示错误。我将能够使用此笔记本来测试此处提供的任何建议。
您不能将 explode
用于结构,但您可以在结构 source
中获取列名称(使用 df.select("source.*").columns
)并使用列表理解创建一个数组每个嵌套结构中你想要的字段,然后分解以获得所需的结果:
from pyspark.sql import functions as F
df1 = df.select(
"dc_id",
F.explode(
F.array(*[
F.struct(
F.lit(s).alias("source_name"),
F.col(f"source.{s}.id").alias("id"),
F.col(f"source.{s}.description").alias("description")
)
for s in df.select("source.*").columns
])
).alias("sources")
).select("dc_id", "sources.*")
df1.show(truncate=False)
#+------+-------------+---+------------------------------------------------+
#|dc_id |source_name |id |description |
#+------+-------------+---+------------------------------------------------+
#|dc-101|sensor-igauge|10 |Sensor attached to the container ceilings |
#|dc-101|sensor-inest |8 |Sensor attached to the factory ceilings |
#|dc-101|sensor-ipad |13 |Sensor ipad attached to carbon cylinders |
#|dc-101|sensor-istick|5 |Sensor embedded in exhaust pipes in the ceilings|
#+------+-------------+---+------------------------------------------------+
如何将下面的 JSON 转换成它后面的关系行?我坚持的部分是 pyspark explode()
函数由于类型不匹配而抛出异常。我还没有找到一种方法来将数据强制转换为合适的格式,以便我可以在 sample_json
对象中的 source
键中的每个对象中创建行。
JSON 输入
sample_json = """
{
"dc_id": "dc-101",
"source": {
"sensor-igauge": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp":35,
"c02_level": 1475,
"geo": {"lat":38.00, "long":97.00}
},
"sensor-ipad": {
"id": 13,
"ip": "67.185.72.1",
"description": "Sensor ipad attached to carbon cylinders",
"temp": 34,
"c02_level": 1370,
"geo": {"lat":47.41, "long":-122.00}
},
"sensor-inest": {
"id": 8,
"ip": "208.109.163.218",
"description": "Sensor attached to the factory ceilings",
"temp": 40,
"c02_level": 1346,
"geo": {"lat":33.61, "long":-111.89}
},
"sensor-istick": {
"id": 5,
"ip": "204.116.105.67",
"description": "Sensor embedded in exhaust pipes in the ceilings",
"temp": 40,
"c02_level": 1574,
"geo": {"lat":35.93, "long":-85.46}
}
}
}"""
期望的输出
dc_id source_name id description
-------------------------------------------------------------------------------
dc-101 sensor-gauge 10 Sensor attached to the container ceilings
dc-101 sensor-ipad 13 Sensor ipad attached to carbon cylinders
dc-101 sensor-inest 8 Sensor attached to the factory ceilings
dc-101 sensor-istick 5 Sensor embedded in exhaust pipes in the ceilings
PYSPARK 代码
from pyspark.sql.functions import *
df_sample_data = spark.read.json(sc.parallelize([sample_json]))
df_expanded = df_sample_data.withColumn("one_source",explode_outer(col("source")))
display(df_expanded)
错误
AnalysisException: cannot resolve 'explode(
source
)' due to data type mismatch: input to function explode should be array or map type, not struct....
我把这个 Databricks notebook 放在一起是为了进一步演示挑战并清楚地显示错误。我将能够使用此笔记本来测试此处提供的任何建议。
您不能将 explode
用于结构,但您可以在结构 source
中获取列名称(使用 df.select("source.*").columns
)并使用列表理解创建一个数组每个嵌套结构中你想要的字段,然后分解以获得所需的结果:
from pyspark.sql import functions as F
df1 = df.select(
"dc_id",
F.explode(
F.array(*[
F.struct(
F.lit(s).alias("source_name"),
F.col(f"source.{s}.id").alias("id"),
F.col(f"source.{s}.description").alias("description")
)
for s in df.select("source.*").columns
])
).alias("sources")
).select("dc_id", "sources.*")
df1.show(truncate=False)
#+------+-------------+---+------------------------------------------------+
#|dc_id |source_name |id |description |
#+------+-------------+---+------------------------------------------------+
#|dc-101|sensor-igauge|10 |Sensor attached to the container ceilings |
#|dc-101|sensor-inest |8 |Sensor attached to the factory ceilings |
#|dc-101|sensor-ipad |13 |Sensor ipad attached to carbon cylinders |
#|dc-101|sensor-istick|5 |Sensor embedded in exhaust pipes in the ceilings|
#+------+-------------+---+------------------------------------------------+