在 Pyspark 中展平 Json
Flatten Json in Pyspark
my_data=[
{'stationCode': 'NB001',
'summaries': [{'period': {'year': 2017}, 'rainfall': 449},
{'period': {'year': 2018}, 'rainfall': 352.4},
{'period': {'year': 2019}, 'rainfall': 253.2},
{'period': {'year': 2020}, 'rainfall': 283},
{'period': {'year': 2021}, 'rainfall': 104.2}]},
{'stationCode': 'NA003',
'summaries': [{'period': {'year': 2019}, 'rainfall': 58.2},
{'period': {'year': 2020}, 'rainfall': 628.2},
{'period': {'year': 2021}, 'rainfall': 120}]}]
在Pandas我可以:
import pandas as pd
from pandas import json_normalize
pd.concat([json_normalize(entry, 'summaries', 'stationCode')
for entry in my_data])
这将给我以下 table:
rainfall period.year stationCode
0 449.0 2017 NB001
1 352.4 2018 NB001
2 253.2 2019 NB001
3 283.0 2020 NB001
4 104.2 2021 NB001
0 58.2 2019 NA003
1 628.2 2020 NA003
2 120.0 2021 NA003
在pyspark中一行代码可以实现吗?
我已经尝试了下面的代码,它给了我同样的结果。但是太长了,有什么办法可以缩短吗?;
df=sc.parallelize(my_data)
df1=spark.read.json(df)
df1.select("stationCode","summaries.period.year","summaries.rainfall").display()
df1 = df1.withColumn("year_rainfall", F.arrays_zip("year", "rainfall"))
.withColumn("year_rainfall", F.explode("year_rainfall"))
.select("stationCode",
F.col("year_rainfall.rainfall").alias("Rainfall"),
F.col("year_rainfall.year").alias("Year"))
df1.display(20, False)
自我介绍 pyspark 等一些解释或良好的信息来源将不胜感激
考虑一个包含以下数据的示例 json 文件。
{
"Name": "TestName",
"Date": "2021-04-09",
"Readings": [
{
"Id": 1,
"Reading": 5.678,
"datetime": "2021-04-09 00:00:00"
},
{
"Id": 2,
"Reading": 3.692,
"datetime": "2020-04-09 00:00:00"
}
]
}
定义一个我们可以强制读取数据的架构。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
data_schema = StructType(fields=[
StructField('Name', StringType(), False),
StructField('Date', StringType(), True),
StructField(
'Readings', ArrayType(
StructType([
StructField('Id', IntegerType(), False),
StructField('Reading', DoubleType(), True),
StructField('datetime', StringType(), True)
])
)
)
])
现在我们可以使用我们的架构来读取目录中的 JSON 个文件
data_df = spark.read.json('/mnt/data/' + '*.json', schema=data_schema)
我们需要嵌套在“Readings”中的数据,因此我们可以使用 explode 来获取这些子列。
from pyspark.sql.functions import explode
data_df = data_df.select(
"Name",
explode("Readings").alias("ReadingsExplode")
).select("Name", "ReadingsExplode.*")
data_df.show()
这应该提供带有展平数据帧的所需输出。
我觉得你的内容很好并且可读。不过你也可以直接zip和explode:
out = (df1.select("stationCode",
F.explode(F.arrays_zip(*["summaries.period.year","summaries.rainfall"])))
.select("stationCode",F.col("col")['0'].alias("year"),F.col("col")['1'].alias("rainfall")))
out.show()
+-----------+----+--------+
|stationCode|year|rainfall|
+-----------+----+--------+
| NB001|2017| 449.0|
| NB001|2018| 352.4|
| NB001|2019| 253.2|
| NB001|2020| 283.0|
| NB001|2021| 104.2|
| NA003|2019| 58.2|
| NA003|2020| 628.2|
| NA003|2021| 120.0|
+-----------+----+--------+
my_data=[
{'stationCode': 'NB001',
'summaries': [{'period': {'year': 2017}, 'rainfall': 449},
{'period': {'year': 2018}, 'rainfall': 352.4},
{'period': {'year': 2019}, 'rainfall': 253.2},
{'period': {'year': 2020}, 'rainfall': 283},
{'period': {'year': 2021}, 'rainfall': 104.2}]},
{'stationCode': 'NA003',
'summaries': [{'period': {'year': 2019}, 'rainfall': 58.2},
{'period': {'year': 2020}, 'rainfall': 628.2},
{'period': {'year': 2021}, 'rainfall': 120}]}]
在Pandas我可以:
import pandas as pd
from pandas import json_normalize
pd.concat([json_normalize(entry, 'summaries', 'stationCode')
for entry in my_data])
这将给我以下 table:
rainfall period.year stationCode
0 449.0 2017 NB001
1 352.4 2018 NB001
2 253.2 2019 NB001
3 283.0 2020 NB001
4 104.2 2021 NB001
0 58.2 2019 NA003
1 628.2 2020 NA003
2 120.0 2021 NA003
在pyspark中一行代码可以实现吗?
我已经尝试了下面的代码,它给了我同样的结果。但是太长了,有什么办法可以缩短吗?;
df=sc.parallelize(my_data)
df1=spark.read.json(df)
df1.select("stationCode","summaries.period.year","summaries.rainfall").display()
df1 = df1.withColumn("year_rainfall", F.arrays_zip("year", "rainfall"))
.withColumn("year_rainfall", F.explode("year_rainfall"))
.select("stationCode",
F.col("year_rainfall.rainfall").alias("Rainfall"),
F.col("year_rainfall.year").alias("Year"))
df1.display(20, False)
自我介绍 pyspark 等一些解释或良好的信息来源将不胜感激
考虑一个包含以下数据的示例 json 文件。
{
"Name": "TestName",
"Date": "2021-04-09",
"Readings": [
{
"Id": 1,
"Reading": 5.678,
"datetime": "2021-04-09 00:00:00"
},
{
"Id": 2,
"Reading": 3.692,
"datetime": "2020-04-09 00:00:00"
}
]
}
定义一个我们可以强制读取数据的架构。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
data_schema = StructType(fields=[
StructField('Name', StringType(), False),
StructField('Date', StringType(), True),
StructField(
'Readings', ArrayType(
StructType([
StructField('Id', IntegerType(), False),
StructField('Reading', DoubleType(), True),
StructField('datetime', StringType(), True)
])
)
)
])
现在我们可以使用我们的架构来读取目录中的 JSON 个文件
data_df = spark.read.json('/mnt/data/' + '*.json', schema=data_schema)
我们需要嵌套在“Readings”中的数据,因此我们可以使用 explode 来获取这些子列。
from pyspark.sql.functions import explode
data_df = data_df.select(
"Name",
explode("Readings").alias("ReadingsExplode")
).select("Name", "ReadingsExplode.*")
data_df.show()
这应该提供带有展平数据帧的所需输出。
我觉得你的内容很好并且可读。不过你也可以直接zip和explode:
out = (df1.select("stationCode",
F.explode(F.arrays_zip(*["summaries.period.year","summaries.rainfall"])))
.select("stationCode",F.col("col")['0'].alias("year"),F.col("col")['1'].alias("rainfall")))
out.show()
+-----------+----+--------+
|stationCode|year|rainfall|
+-----------+----+--------+
| NB001|2017| 449.0|
| NB001|2018| 352.4|
| NB001|2019| 253.2|
| NB001|2020| 283.0|
| NB001|2021| 104.2|
| NA003|2019| 58.2|
| NA003|2020| 628.2|
| NA003|2021| 120.0|
+-----------+----+--------+