重新排列 StrucTtype 和嵌套数组类型

Rearranging StrucType and nested ArrayTypes

我有一个具有以下架构的数据框:

root
 |-- col2: integer (nullable = true)
 |-- col1: integer (nullable = true)
 |-- structCol3: struct (nullable = true)
 |    |-- structField2: boolean (nullable = true)
 |    |-- structField1: string (nullable = true)
 |-- structCol3: struct (nullable = true)
 |    |-- nestedArray: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- elem3: double (nullable = true)
 |    |    |    |-- elem2: string (nullable = true)
 |    |    |    |-- elem1: string (nullable = true)
 |    |-- structField2: integer (nullable = true)

并且由于兼容性问题,我正在尝试将其输出为镶木地板格式,但格式如下:

root
 |-- col1: integer (nullable = true) 
 |-- col2: integer (nullable = true)
 |-- structCol3: struct (nullable = true)
 |    |-- structField1: string (nullable = true)
 |    |-- structField2: boolean (nullable = true)
 |-- structCol4: struct (nullable = true)
 |    |-- nestedArray: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- elem1: string (nullable = true)
 |    |    |    |-- elem2: string (nullable = true)
 |    |    |    |-- elem3: double (nullable = true)
 |    |-- structField2: integer (nullable = true)

到目前为止,我已经成功地重新排列了结构中的列和字段,如下所示:

dfParquetOutput = df.select(
    "col1",
    "col2",
    struct(
        col("structCol3.structField1"), 
        col("structCol3.structField2")
    ).alias("structCol3"),
    struct(
        col("structCol4.nestedArray"),
        col("structCol4.structField2")
    ).alias("structCol4")
)

不幸的是,我正在努力寻找一种方法来重新排列 Array 内的 StructType 内的元素。我考虑过尝试使用 udf,但由于我对 spark 还很陌生,所以我没有成功。我还尝试使用预定义模式创建一个新数据框,但根据我的测试,列是根据位置而不是名称分配的。

有没有一种简单的方法可以重新排列数组中的结构。

你真的不能在这里避免udf(或RDD)。如果将数据定义为

from pyspark.sql.functions import udf, struct, col
from collections import namedtuple

Outer = namedtuple("Outer", ["structCol4"])
Inner = namedtuple("Inner", ["nestedArray", "structField2"])
Element = namedtuple("Element", ["col3", "col2", "col1"])

df = spark.createDataFrame([Outer(Inner([Element("3", "2", "1")], 1))])

你可以

@udf("array<struct<col1: string, col2: string, col3: string>>")
def reorder(arr):
    return [(col1, col2, col3) for col3, col2, col1 in arr]

result = df.withColumn(
    "structCol4", 
     struct(reorder("structCol4.nestedArray").alias("nestedArray"), col("structCol4.structField2")))

result.printSchema()
# root
#  |-- structCol4: struct (nullable = false)
#  |    |-- nestedArray: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- col1: string (nullable = true)
#  |    |    |    |-- col2: string (nullable = true)
#  |    |    |    |-- col3: string (nullable = true)
#  |    |-- structField2: long (nullable = true)
# 


result.show()
# +----------------+
# |      structCol4|
# +----------------+
# |[[[1, 2, 3]], 1]|
# +----------------+

对于深度嵌套的模式,您将在 udf 中重构完整的树,但此处不需要。