"explode" 如何将单元格上的数据格式化为字符串,以便我可以将键转换为 pyspark 上的列?

How "explode" the data on a cell formatted as string so I can turn the keys into columns on pyspark?

我正在使用的数据集之一有一个名为 json_data 的列,其中包含如下数据:

{
    "eta": "",
    "eta_value": 0,
    "schedules": [{
        "open_time": "10:15:00",
        "close_time": "14:00:00"
    }, {
        "open_time": "18:00:00",
        "close_time": "20:00:00"
    }],
    "logo": "1617723892776.png",
    "score_v2": 0,
    "id": "900371722_8339714",
    "store_id": 900371722,
    "super_store_id": 900371722,
    "index": 375,
    "brand_name": "Carl's Restaurant",
    "store_type": "restaurant",
    "has_promise": false,
    "tags": [189],
    "background": "1618349497.jpg",
    "is_enabled": false,
    "friendly_url": {
        "store_id": 90037172
    }
}

这个栏目是字符串类型,这意味着我不能轻易将里面的信息变成栏目。这就是让我来到这里的原因:我怎样才能将这里的数据按列转换?特别是“时间表”中的嵌套数据。

我很难写这个专栏。

几个月前,我也在为类似的 json 结构而苦苦挣扎。很高兴你提起它,帮助我重温记忆!
我按照以下步骤解决了 -

输入数据

df1.show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|jsondata|

|{     "eta": "",     "eta_value": 0,     "schedules": [{         "open_time": "10:15:00",         "close_time": "14:00:00"     }, {         "open_time": "18:00:00",         "close_time": "20:00:00"     }],     "logo": "1617723892776.png",     "score_v2": 0,     "id": "900371722_8339714",     "store_id": 900371722,     "super_store_id": 900371722,     "index": 375,     "brand_name": "Carl's Restaurant",     "store_type": "restaurant",     "has_promise": false,     "tags": [189],     "background": "1618349497.jpg",     "is_enabled": false,     "friendly_url": {         "store_id": 90037172     } }|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

jsondata 列转换为 MapType 如下 -

from pyspark.sql.functions import *
from pyspark.sql.types import *

df2 = df1.withColumn("cols", from_json( "jsondata", MapType(StringType(), StringType()) )).drop("jsondata")
df2.show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cols                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{eta -> , eta_value -> 0, schedules -> [{"open_time":"10:15:00","close_time":"14:00:00"},{"open_time":"18:00:00","close_time":"20:00:00"}], logo -> 1617723892776.png, score_v2 -> 0, id -> 900371722_8339714, store_id -> 900371722, super_store_id -> 900371722, index -> 375, brand_name -> Carl's Restaurant, store_type -> restaurant, has_promise -> false, tags -> [189], background -> 1618349497.jpg, is_enabled -> false, friendly_url -> {"store_id":90037172}}|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

现在,第 cols 列需要展开如下 -

df3= df2.select(explode("cols").alias("col_columns", "col_rows"))
df3.show(truncate=False)

+--------------+---------------------------------------------------------------------------------------------------+
|col_columns   |col_rows                                                                                           |
+--------------+---------------------------------------------------------------------------------------------------+
|eta           |                                                                                                   |
|eta_value     |0                                                                                                  |
|schedules     |[{"open_time":"10:15:00","close_time":"14:00:00"},{"open_time":"18:00:00","close_time":"20:00:00"}]|
|logo          |1617723892776.png                                                                                  |
|score_v2      |0                                                                                                  |
|id            |900371722_8339714                                                                                  |
|store_id      |900371722                                                                                          |
|super_store_id|900371722                                                                                          |
|index         |375                                                                                                |
|brand_name    |Carl's Restaurant                                                                                  |
|store_type    |restaurant                                                                                         |
|has_promise   |false                                                                                              |
|tags          |[189]                                                                                              |
|background    |1618349497.jpg                                                                                     |
|is_enabled    |false                                                                                              |
|friendly_url  |{"store_id":90037172}                                                                              |
+--------------+---------------------------------------------------------------------------------------------------+

曾经,您有 col_columnscol_rows 作为单独的列,所有需要做的就是 pivot col_columns 并使用其对应的第一个 col_rows 聚合它作为下面-

df4 = (df3.groupBy()
         .pivot("col_columns")
         .agg(first("col_rows"))
)
df4.show(truncate=False)

输出

+--------------+-----------------+---+---------+---------------------+-----------+-----------------+-----+----------+-----------------+---------------------------------------------------------------------------------------------------+--------+---------+----------+--------------+-----+
|background    |brand_name       |eta|eta_value|friendly_url         |has_promise|id               |index|is_enabled|logo             |schedules                                                                                          |score_v2|store_id |store_type|super_store_id|tags |
+--------------+-----------------+---+---------+---------------------+-----------+-----------------+-----+----------+-----------------+---------------------------------------------------------------------------------------------------+--------+---------+----------+--------------+-----+
|1618349497.jpg|Carl's Restaurant|   |0        |{"store_id":90037172}|false      |900371722_8339714|375  |false     |1617723892776.png|[{"open_time":"10:15:00","close_time":"14:00:00"},{"open_time":"18:00:00","close_time":"20:00:00"}]|0       |900371722|restaurant|900371722     |[189]|
+--------------+-----------------+---+---------+---------------------+-----------+-----------------+-----+----------+-----------------+---------------------------------------------------------------------------------------------------+--------+---------+----------+--------------+-----+

P.S. - 如果您还想分解像 schedulesfriendly_url 这样的列,那么您可能需要重复以上步骤。如下所示 -

df4 = df4.withColumn("schedule_json", from_json("schedules", ArrayType(MapType(StringType(),StringType()))))

df4.select(explode("schedule_json").alias("schedules")).select(explode("schedules")).show(truncate=False)

+----------+--------+
|key       |value   |
+----------+--------+
|open_time |10:15:00|
|close_time|14:00:00|
|open_time |18:00:00|
|close_time|20:00:00|
+----------+--------+