从 pyspark 中的 MapType 列获取键并在导航中使用它

get keys from MapType column in pyspark and use it in navigation

我有一个具有以下架构的 spark df:

|-- col1 : string
|-- col2 : string
|-- data: struct
|    |-- items: map (nullable = true)
|    |    |-- key: string
|    |    |-- value: struct
|    |    |    |-- id: string 
|    |    |    |-- legalNature

我收到了不同的 json,在 items 对象下面有不同的键, 这里的例子:

+-------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|   col1| col2    | data                                                                                                                                                                                   |
+-------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| xxx   |  yyy    |{"data":{"items":{"71f3e2e4-5c3a-466d-8063-7bfd753b303c":{"id":"123","legalNature":"legalNature","allowedSignature":[],"category":"TP01","createdOn":"2021-01-22T13:17:12.502+01:00"}}}}|
| ...   |  ...    |{"data":{"items":{"86c6b41e-085c-eb11-a812-000d3ab29c25":{"id":"153","legalNature":"legalNature","allowedSignature":[],"category":"TP01","createdOn":"2021-01-22T13:17:12.502+01:00"}}}}|
|       |         |{"data":{"items":{"56c6b41e-085c-eb11-a812-000d3ab29c24":{"id":"173","legalNature":"legalNature","allowedSignature":[],"category":"TP01","createdOn":"2021-01-22T13:17:12.502+01:00"}}}}|
|       |         |{"data":{"items":{"1843f179-3687-eb11-a812-0022489bac2c":{"id":"193","legalNature":"legalNature","allowedSignature":[],"category":"TP01","createdOn":"2021-01-22T13:17:12.502+01:00"}}}}|
|       |         |{"data":{"items":{"2643f179-3687-eb11-a812-0022489bac2a":{"id":"133","legalNature":"legalNature","allowedSignature":[],"category":"TP01","createdOn":"2021-01-22T13:17:12.502+01:00"}}}}|
|       |         |{"data":{"items":{"91f3e2e4-5c3a-466d-8063-7bfd753b345i":{"id":"143","legalNature":"legalNature","allowedSignature":[],"category":"TP01","createdOn":"2021-01-22T13:17:12.502+01:00"}}}}|
+-------+---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Items 是类型 MapType(StringType(), itemsSchema) 的结构,因为来自地图类型的键字符串可能会在我得到的每个 json 中发生变化,我如何才能按顺序动态导航我的 json 模式获取项目结构中的字段?

例如,我需要这样的东西来执行 select 操作:

df
    .select(
      col("col1"),
      col("col2").alias("my_col_2"),
      col(f"data.items.{itemsKey}.legalNature"),
      col(f"data.items.{itemsKey}.id"))
)

我的 df 中每个 json 的 itemsKey 都会发生变化。 我已经看到要获取密钥,我可以使用 map_keys 函数,如下所示:

  df.select(map_keys("data.items"))

但问题是此函数返回的是数据框而不是字符串,更准确地说是每行具有不同 itemsKey 的数据框。 有没有办法动态获取我的itemsKey?

我希望我已经说清楚了,我们将不胜感激。

你应该使用 spark.sql

的分解函数
from pyspark.sql.functions import explode

df3 = df.select(df.*,explode(df.data.items))

这将给出结果

+-------+-------------------------------------------+--------------------------------------------+
|   col1| key                                       |value                     
+-------+-------------------------------------------+-----+---------------------------------------
|    ...| a04452cb-a909-47b0-ad5a-9bc44c6014e3|brown|{"legalNature":"legalNature","id": "123",..}|
+-------+----+-----+--------------------------------|---------------------------------------------|

要使用您的特定 itemId 进行过滤,只需使用过滤功能


filteredValue = df3.filter(df.key == "Your_item_id").select("value").first();

legalName = filteredValue["value"]["legalName"]
id = filteredValue["value"]["id"]
...
...