Pyspark 将 'key:value' 字符串元素的数组拆分为结构并在找到时提取一些值

Pyspark Split array of 'key:value' string elements to a struct and extract some values when found

我有一个具有此架构的 DataFrame:

root
 |-- id: string (nullable = false)
 |-- data_zone_array: array (nullable = true)
 |    |-- element: string (containsNull = true)

它实际上包含一个数组data_zone_array,其中包含几个或多或少可预测的字符串值(或根本none),其中它们的键和值由:分隔;看起来像这样 show(5) 输出:

id  |   data_zone_array
1   |   ['name:john', 'surname:wick', 'group:a', 'group:b', 'group:c']
2   |   ['name:joe', 'surname:linda', 'surname:boss', 'group:d', 'group:b']
3   |   ['name:david', 'group:a', 'age:7']
4   |   ['name:mary', 'surname:gilles']
5   |   ['name:charles', 'surname:paul', 'group:d', 'group:b', 'group:c', 'age:6', 'unplanned_attribute_165:thisvalue']

我想:

它会给出这种模式:

root
 |-- id: string (nullable = false)
 |-- name: string (nullable = true)
 |-- surname: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- other_attributes: struct (nullable = true)
 |    |-- <attrx>: array (containsNull = true)  
 |    |    |-- element: string(containsNull = true) 
 |    |-- <attry>: array (containsNull = true)
 |    |    |-- element: string(containsNull = true)
 |    |-- ......    

记录如下:

id  |   name        |   surname             |   other_attributes
1   |   'john'      |   ['wick']            |   {group:['a','b','c']}
2   |   'joe'       |   ['boss', 'linda']   |   {group:['b', 'd']}
3   |   'david'     |   <null>              |   {group: ['a'], age:['7']}
4   |   'mary'      |   ['gilles']          |   <null>
5   |   'charles'   |   ['paul']            |   {group: ['b','c','d'], age:['6'], unplanned_attribute_165:['thisvalue']}

知道如何执行此类操作吗?

这是一种方法。

首先,通过在 :.

上拆分,分解列 data_zone_array 并将键和值提取到单独的列 keyvalue

然后,按 idkey 分组并收集与每个键关联的值列表。并再次按 id 分组以创建地图 key -> [values].

最后,select 您想要的键作为列并使用 map_keys + filter + transform 从映射中过滤键的剩余部分以创建 other_attributes列。

import pyspark.sql.functions as F

df1 = (df.withColumn("data_zone_array", F.explode("data_zone_array"))
       .withColumn("key", F.split("data_zone_array", ":")[0])
       .withColumn("value", F.split("data_zone_array", ":")[1])
       .groupBy("id", "key").agg(F.collect_list("value").alias("values"))
       .groupBy("id").agg(F.map_from_arrays(F.collect_list("key"), F.collect_list("values")).alias("attributes"))
       .select("id",
               F.col("attributes")["name"].alias("name"),
               F.col("attributes")["surname"].alias("surname"),
               F.expr("""transform(
                          filter(map_keys(attributes), k -> k not in('name', 'surname')),
                           x -> struct(x as key, attributes[x] as value)
                      )""").alias("other_attributes")
               )
       )

df1.show(truncate=False)
# +---+---------+-------------+------------------------------------------------------------------------+
# |id |name     |surname      |other_attributes                                                        |
# +---+---------+-------------+------------------------------------------------------------------------+
# |5  |[charles]|[paul]       |[{group, [d, b, c]}, {age, [6]}, {unplanned_attribute_165, [thisvalue]}]|
# |1  |[john]   |[wick]       |[{group, [a, b, c]}]                                                    |
# |3  |[david]  |null         |[{group, [a]}, {age, [7]}]                                              |
# |2  |[joe]    |[linda, boss]|[{group, [d, b]}]                                                       |
# |4  |[mary]   |[gilles]     |[]                                                                      |
# +---+---------+-------------+------------------------------------------------------------------------+