如何将嵌套 JSON 扩展到 AWS 胶水上的 Spark 数据框
How to expand nested JSON into Spark dataframe on AWS glue
使用以下市场营销 JSON 文件
{
"request_id": "xx",
"timeseries_stats": [
{
"timeseries_stat": {
"id": "xx",
"timeseries": [
{
"start_time": "xx",
"end_time": "xx",
"stats": {
"impressions": xx,
"swipes": xx,
"view_completion": xx,
"spend": xx
}
},
{
"start_time": "xx",
"end_time": "xx",
"stats": {
"impressions": xx,
"swipes": xx,
"view_completion": xx,
"spend": xx
}
}
我可以很容易地使用 pandas 解析它并获得格式为
的所需数据帧
start_time end_time impressions swipes view_completion spend
xx xx xx xx xx xx
xx xx xx xx xx xx
但需要在 AWS Glue 上的 spark 中完成。
使用
创建初始 spark 数据帧 (df) 后
rdd = sc.parallelize(JSON_resp['timeseries_stats'][0]['timeseries_stat']['timeseries'])
df = rdd.toDF()
我尝试按如下方式扩展 stats 键
df_expanded = df.select("start_time","end_time","stats.*")
错误:
AnalysisException: 'Can only star expand struct data types.
Attribute: `ArrayBuffer(stats)`;'
&
from pyspark.sql.functions import explode
df_expanded = df.select("start_time","end_time").withColumn("stats", explode(df.stats))
错误:
AnalysisException: 'The number of aliases supplied in the AS clause does not match the
number of columns output by the UDTF expected 2 aliases but got stats ;
对 spark 很陌生,对于这两种方法中的任何一种,我们都将不胜感激!
这是一个非常类似的问题:
除了我需要展平这个额外的统计键。
当您 explode
地图列时,它会给您两个列,因此 .withColumn
不起作用。将 explode
与 select
语句一起使用。
from pyspark.sql import functions as f
df.select('start_time', 'end_time', f.explode('stats')) \
.groupBy('start_time', 'end_time').pivot('key').agg(f.first('value')).show()
+----------+--------+-----------+-----+------+---------------+
|start_time|end_time|impressions|spend|swipes|view_completion|
+----------+--------+-----------+-----+------+---------------+
| yy| yy| yy| yy| yy| yy|
| xx| xx| xx| xx| xx| xx|
+----------+--------+-----------+-----+------+---------------+
使用以下市场营销 JSON 文件
{
"request_id": "xx",
"timeseries_stats": [
{
"timeseries_stat": {
"id": "xx",
"timeseries": [
{
"start_time": "xx",
"end_time": "xx",
"stats": {
"impressions": xx,
"swipes": xx,
"view_completion": xx,
"spend": xx
}
},
{
"start_time": "xx",
"end_time": "xx",
"stats": {
"impressions": xx,
"swipes": xx,
"view_completion": xx,
"spend": xx
}
}
我可以很容易地使用 pandas 解析它并获得格式为
的所需数据帧start_time end_time impressions swipes view_completion spend
xx xx xx xx xx xx
xx xx xx xx xx xx
但需要在 AWS Glue 上的 spark 中完成。
使用
创建初始 spark 数据帧 (df) 后rdd = sc.parallelize(JSON_resp['timeseries_stats'][0]['timeseries_stat']['timeseries'])
df = rdd.toDF()
我尝试按如下方式扩展 stats 键
df_expanded = df.select("start_time","end_time","stats.*")
错误:
AnalysisException: 'Can only star expand struct data types.
Attribute: `ArrayBuffer(stats)`;'
&
from pyspark.sql.functions import explode
df_expanded = df.select("start_time","end_time").withColumn("stats", explode(df.stats))
错误:
AnalysisException: 'The number of aliases supplied in the AS clause does not match the
number of columns output by the UDTF expected 2 aliases but got stats ;
对 spark 很陌生,对于这两种方法中的任何一种,我们都将不胜感激!
这是一个非常类似的问题:
除了我需要展平这个额外的统计键。
当您 explode
地图列时,它会给您两个列,因此 .withColumn
不起作用。将 explode
与 select
语句一起使用。
from pyspark.sql import functions as f
df.select('start_time', 'end_time', f.explode('stats')) \
.groupBy('start_time', 'end_time').pivot('key').agg(f.first('value')).show()
+----------+--------+-----------+-----+------+---------------+
|start_time|end_time|impressions|spend|swipes|view_completion|
+----------+--------+-----------+-----+------+---------------+
| yy| yy| yy| yy| yy| yy|
| xx| xx| xx| xx| xx| xx|
+----------+--------+-----------+-----+------+---------------+