重新排列 StrucTtype 和嵌套数组类型
Rearranging StrucType and nested ArrayTypes
我有一个具有以下架构的数据框:
root
|-- col2: integer (nullable = true)
|-- col1: integer (nullable = true)
|-- structCol3: struct (nullable = true)
| |-- structField2: boolean (nullable = true)
| |-- structField1: string (nullable = true)
|-- structCol3: struct (nullable = true)
| |-- nestedArray: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- elem3: double (nullable = true)
| | | |-- elem2: string (nullable = true)
| | | |-- elem1: string (nullable = true)
| |-- structField2: integer (nullable = true)
并且由于兼容性问题,我正在尝试将其输出为镶木地板格式,但格式如下:
root
|-- col1: integer (nullable = true)
|-- col2: integer (nullable = true)
|-- structCol3: struct (nullable = true)
| |-- structField1: string (nullable = true)
| |-- structField2: boolean (nullable = true)
|-- structCol4: struct (nullable = true)
| |-- nestedArray: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- elem1: string (nullable = true)
| | | |-- elem2: string (nullable = true)
| | | |-- elem3: double (nullable = true)
| |-- structField2: integer (nullable = true)
到目前为止,我已经成功地重新排列了结构中的列和字段,如下所示:
dfParquetOutput = df.select(
"col1",
"col2",
struct(
col("structCol3.structField1"),
col("structCol3.structField2")
).alias("structCol3"),
struct(
col("structCol4.nestedArray"),
col("structCol4.structField2")
).alias("structCol4")
)
不幸的是,我正在努力寻找一种方法来重新排列 Array 内的 StructType 内的元素。我考虑过尝试使用 udf,但由于我对 spark 还很陌生,所以我没有成功。我还尝试使用预定义模式创建一个新数据框,但根据我的测试,列是根据位置而不是名称分配的。
有没有一种简单的方法可以重新排列数组中的结构。
你真的不能在这里避免udf
(或RDD)。如果将数据定义为
from pyspark.sql.functions import udf, struct, col
from collections import namedtuple
Outer = namedtuple("Outer", ["structCol4"])
Inner = namedtuple("Inner", ["nestedArray", "structField2"])
Element = namedtuple("Element", ["col3", "col2", "col1"])
df = spark.createDataFrame([Outer(Inner([Element("3", "2", "1")], 1))])
你可以
@udf("array<struct<col1: string, col2: string, col3: string>>")
def reorder(arr):
return [(col1, col2, col3) for col3, col2, col1 in arr]
result = df.withColumn(
"structCol4",
struct(reorder("structCol4.nestedArray").alias("nestedArray"), col("structCol4.structField2")))
result.printSchema()
# root
# |-- structCol4: struct (nullable = false)
# | |-- nestedArray: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- col1: string (nullable = true)
# | | | |-- col2: string (nullable = true)
# | | | |-- col3: string (nullable = true)
# | |-- structField2: long (nullable = true)
#
result.show()
# +----------------+
# | structCol4|
# +----------------+
# |[[[1, 2, 3]], 1]|
# +----------------+
对于深度嵌套的模式,您将在 udf 中重构完整的树,但此处不需要。
我有一个具有以下架构的数据框:
root
|-- col2: integer (nullable = true)
|-- col1: integer (nullable = true)
|-- structCol3: struct (nullable = true)
| |-- structField2: boolean (nullable = true)
| |-- structField1: string (nullable = true)
|-- structCol3: struct (nullable = true)
| |-- nestedArray: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- elem3: double (nullable = true)
| | | |-- elem2: string (nullable = true)
| | | |-- elem1: string (nullable = true)
| |-- structField2: integer (nullable = true)
并且由于兼容性问题,我正在尝试将其输出为镶木地板格式,但格式如下:
root
|-- col1: integer (nullable = true)
|-- col2: integer (nullable = true)
|-- structCol3: struct (nullable = true)
| |-- structField1: string (nullable = true)
| |-- structField2: boolean (nullable = true)
|-- structCol4: struct (nullable = true)
| |-- nestedArray: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- elem1: string (nullable = true)
| | | |-- elem2: string (nullable = true)
| | | |-- elem3: double (nullable = true)
| |-- structField2: integer (nullable = true)
到目前为止,我已经成功地重新排列了结构中的列和字段,如下所示:
dfParquetOutput = df.select(
"col1",
"col2",
struct(
col("structCol3.structField1"),
col("structCol3.structField2")
).alias("structCol3"),
struct(
col("structCol4.nestedArray"),
col("structCol4.structField2")
).alias("structCol4")
)
不幸的是,我正在努力寻找一种方法来重新排列 Array 内的 StructType 内的元素。我考虑过尝试使用 udf,但由于我对 spark 还很陌生,所以我没有成功。我还尝试使用预定义模式创建一个新数据框,但根据我的测试,列是根据位置而不是名称分配的。
有没有一种简单的方法可以重新排列数组中的结构。
你真的不能在这里避免udf
(或RDD)。如果将数据定义为
from pyspark.sql.functions import udf, struct, col
from collections import namedtuple
Outer = namedtuple("Outer", ["structCol4"])
Inner = namedtuple("Inner", ["nestedArray", "structField2"])
Element = namedtuple("Element", ["col3", "col2", "col1"])
df = spark.createDataFrame([Outer(Inner([Element("3", "2", "1")], 1))])
你可以
@udf("array<struct<col1: string, col2: string, col3: string>>")
def reorder(arr):
return [(col1, col2, col3) for col3, col2, col1 in arr]
result = df.withColumn(
"structCol4",
struct(reorder("structCol4.nestedArray").alias("nestedArray"), col("structCol4.structField2")))
result.printSchema()
# root
# |-- structCol4: struct (nullable = false)
# | |-- nestedArray: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- col1: string (nullable = true)
# | | | |-- col2: string (nullable = true)
# | | | |-- col3: string (nullable = true)
# | |-- structField2: long (nullable = true)
#
result.show()
# +----------------+
# | structCol4|
# +----------------+
# |[[[1, 2, 3]], 1]|
# +----------------+
对于深度嵌套的模式,您将在 udf 中重构完整的树,但此处不需要。