将数组转换为结构 pyspark

convert array to struct pyspark

我是 pyspark 的新手,我有一个数据框,目前如下所示。

| col1                            | col2              |
+---------------------------------+-------------------+
| [(a, 0)], [(b,0)].....[(z,1)]   | [0, 0, ... 1]     |
| [(b, 0)], [(b,1)].....[(z,0)]   | [0, 1, ... 0]     |
| [(a, 0)], [(c, 1)].....[(z,0)]  | [0, 1, ... 0]     |

我将 col1.QueryNum 中的值提取到 col2 中,当我打印架构时,它是一个包含 col1.QueryNum.

中数字列表的数组

最终我的目标是将 col2 中的列表值转换为 pyspark 中的结构格式(参考所需的架构)。

当前架构

 |-- col1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- types: string (nullable = true)
 |    |    |-- QueryNum: integer (nullable = true)
 |-- col2: array (nullable = true)
 |    |-- element: integer (containsNull = true)

所需架构

 |-- col2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- val1: integer (nullable = true)
 |    |    |-- val2: integer (nullable = true)
                 .
                 .
                 .
 |    |    |-- val80: integer (nullable = true)

我尝试使用 from_json 但它并没有真正起作用。

如果你有固定的数组大小,你可以使用 list-comprehension:

创建结构
from pyspark.sql import functions as F

df1 = df.withColumn(
    "col2",
    F.array(
        F.struct(*[
            F.col("col1")[i]["QueryNum"].alias(f"val{i+1}") for i in range(2)
        ])
    )
)

df1.show()
#+----------------+--------+
#|col1            |col2    |
#+----------------+--------+
#|[[0, a], [0, b]]|[[0, 0]]|
#|[[0, b], [1, b]]|[[0, 1]]|
#|[[0, a], [1, c]]|[[0, 1]]|
#+----------------+--------+

df1.printSchema()
#root
#|-- col1: array (nullable = true)
#|    |-- element: struct (containsNull = true)
#|    |    |-- QueryNum: long (nullable = true)
#|    |    |-- types: string (nullable = true)
#|-- col2: array (nullable = false)
#|    |-- element: struct (containsNull = false)
#|    |    |-- val1: long (nullable = true)
#|    |    |-- val2: long (nullable = true)

但是请注意,在这种情况下不需要使用数组,因为您将始终在该数组中拥有一个结构。只需使用简单的结构:

df1 = df.withColumn(
    "col2",
    F.struct(*[
        F.col("col1")[i]["QueryNum"].alias(f"val{i+1}") for i in range(2)
    ])
)

或者如果您更喜欢地图类型:

df1 = df.withColumn(
    "col2",
    F.map_from_entries(
        F.expr("transform(col1, (x,i) -> struct('val' || (i+1) as name, x.QueryNum as value))")
    )
)