如何将嵌套 JSON 扩展到 AWS 胶水上的 Spark 数据框

How to expand nested JSON into Spark dataframe on AWS glue

使用以下市场营销 JSON 文件

{
    "request_id": "xx",
    "timeseries_stats": [
        {
            "timeseries_stat": {
                "id": "xx",
                "timeseries": [
                    {
                        "start_time": "xx",
                        "end_time": "xx",
                        "stats": {
                            "impressions": xx,
                            "swipes": xx,
                            "view_completion": xx,
                            "spend": xx
                        }
                    },
                    {
                        "start_time": "xx",
                        "end_time": "xx",
                        "stats": {
                            "impressions": xx,
                            "swipes": xx,
                            "view_completion": xx,
                            "spend": xx
                        }
                    }

我可以很容易地使用 pandas 解析它并获得格式为

的所需数据帧
start_time   end_time     impressions   swipes   view_completion    spend
    xx          xx             xx         xx            xx            xx
    xx          xx             xx         xx            xx            xx

但需要在 AWS Glue 上的 spark 中完成。

使用

创建初始 spark 数据帧 (df) 后
rdd = sc.parallelize(JSON_resp['timeseries_stats'][0]['timeseries_stat']['timeseries'])
df = rdd.toDF()

我尝试按如下方式扩展 stats

df_expanded = df.select("start_time","end_time","stats.*")

错误:

AnalysisException: 'Can only star expand struct data types. 
Attribute: `ArrayBuffer(stats)`;'

&

from pyspark.sql.functions import explode
df_expanded = df.select("start_time","end_time").withColumn("stats", explode(df.stats))

错误:

AnalysisException: 'The number of aliases supplied in the AS clause does not match the 
number of columns output by the UDTF expected 2 aliases but got stats ;

对 spark 很陌生,对于这两种方法中的任何一种,我们都将不胜感激!

这是一个非常类似的问题:

除了我需要展平这个额外的统计键。

当您 explode 地图列时,它会给您两个列,因此 .withColumn 不起作用。将 explodeselect 语句一起使用。

from pyspark.sql import functions as f

df.select('start_time', 'end_time', f.explode('stats')) \
  .groupBy('start_time', 'end_time').pivot('key').agg(f.first('value')).show()

+----------+--------+-----------+-----+------+---------------+
|start_time|end_time|impressions|spend|swipes|view_completion|
+----------+--------+-----------+-----+------+---------------+
|        yy|      yy|         yy|   yy|    yy|             yy|
|        xx|      xx|         xx|   xx|    xx|             xx|
+----------+--------+-----------+-----+------+---------------+