重命名 Spark DataFrame 中的嵌套结构列
Rename nested struct columns in a Spark DataFrame
我正在尝试更改 Scala 中 DataFrame 列的名称。我可以轻松更改直接字段的列名,但在转换数组结构列时遇到困难。
下面是我的 DataFrame 架构。
|-- _VkjLmnVop: string (nullable = true)
|-- _KaTasLop: string (nullable = true)
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- _MnoPqrstUv: string (nullable = true)
| | |-- _ManDevyIxyz: string (nullable = true)
但我需要如下架构
|-- vkj_lmn_vop: string (nullable = true)
|-- ka_tas_lop: string (nullable = true)
|-- abc_def: struct (nullable = true)
| |-- uvw_xyz: struct (nullable = true)
| | |-- mno_pqrst_uv: string (nullable = true)
| | |-- man_devy_ixyz: string (nullable = true)
对于非结构列,我将按以下方式更改列名称
def aliasAllColumns(df: DataFrame): DataFrame = {
df.select(df.columns.map { c =>
df.col(c)
.as(
c.replaceAll("_", "")
.replaceAll("([A-Z])", "_")
.toLowerCase
.replaceFirst("_", ""))
}: _*)
}
aliasAllColumns(file_data_df).show(1)
如何动态更改 Struct 列名称?
据我所知,直接重命名嵌套字段是不可能的。
从一侧,您可以尝试移动到一个平面物体。
但是,如果你需要保持结构,你可以玩spark.sql.functions.struct(*cols)。
Creates a new struct column.
Parameters: cols – list of column names (string) or list of Column expressions
您需要分解所有架构,生成您需要的别名,然后使用 struct
函数重新组合。
这不是最好的解决方案。但它是一些东西:)
Pd:我附上了 PySpark 文档,因为它包含比 Scala 文档更好的解释。
您可以创建一个递归方法来遍历 DataFrame 模式以重命名列:
import org.apache.spark.sql.types._
def renameAllCols(schema: StructType, rename: String => String): StructType = {
def recurRename(schema: StructType): Seq[StructField] = schema.fields.map{
case StructField(name, dtype: StructType, nullable, meta) =>
StructField(rename(name), StructType(recurRename(dtype)), nullable, meta)
case StructField(name, dtype: ArrayType, nullable, meta) if dtype.elementType.isInstanceOf[StructType] =>
StructField(rename(name), ArrayType(StructType(recurRename(dtype.elementType.asInstanceOf[StructType])), true), nullable, meta)
case StructField(name, dtype, nullable, meta) =>
StructField(rename(name), dtype, nullable, meta)
}
StructType(recurRename(schema))
}
使用以下示例对其进行测试:
import org.apache.spark.sql.functions._
import spark.implicits._
val renameFcn = (s: String) =>
s.replace("_", "").replaceAll("([A-Z])", "_").toLowerCase.dropWhile(_ == '_')
case class C(A_Bc: Int, D_Ef: Int)
val df = Seq(
(10, "a", C(1, 2), Seq(C(11, 12), C(13, 14)), Seq(101, 102)),
(20, "b", C(3, 4), Seq(C(15, 16)), Seq(103))
).toDF("_VkjLmnVop", "_KaTasLop", "AbcDef", "ArrStruct", "ArrInt")
val newDF = spark.createDataFrame(df.rdd, renameAllCols(df.schema, renameFcn))
newDF.printSchema
// root
// |-- vkj_lmn_vop: integer (nullable = false)
// |-- ka_tas_lop: string (nullable = true)
// |-- abc_def: struct (nullable = true)
// | |-- a_bc: integer (nullable = false)
// | |-- d_ef: integer (nullable = false)
// |-- arr_struct: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- a_bc: integer (nullable = false)
// | | |-- d_ef: integer (nullable = false)
// |-- arr_int: array (nullable = true)
// | |-- element: integer (containsNull = false)
我正在尝试更改 Scala 中 DataFrame 列的名称。我可以轻松更改直接字段的列名,但在转换数组结构列时遇到困难。
下面是我的 DataFrame 架构。
|-- _VkjLmnVop: string (nullable = true)
|-- _KaTasLop: string (nullable = true)
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- _MnoPqrstUv: string (nullable = true)
| | |-- _ManDevyIxyz: string (nullable = true)
但我需要如下架构
|-- vkj_lmn_vop: string (nullable = true)
|-- ka_tas_lop: string (nullable = true)
|-- abc_def: struct (nullable = true)
| |-- uvw_xyz: struct (nullable = true)
| | |-- mno_pqrst_uv: string (nullable = true)
| | |-- man_devy_ixyz: string (nullable = true)
对于非结构列,我将按以下方式更改列名称
def aliasAllColumns(df: DataFrame): DataFrame = {
df.select(df.columns.map { c =>
df.col(c)
.as(
c.replaceAll("_", "")
.replaceAll("([A-Z])", "_")
.toLowerCase
.replaceFirst("_", ""))
}: _*)
}
aliasAllColumns(file_data_df).show(1)
如何动态更改 Struct 列名称?
据我所知,直接重命名嵌套字段是不可能的。
从一侧,您可以尝试移动到一个平面物体。
但是,如果你需要保持结构,你可以玩spark.sql.functions.struct(*cols)。
Creates a new struct column.
Parameters: cols – list of column names (string) or list of Column expressions
您需要分解所有架构,生成您需要的别名,然后使用 struct
函数重新组合。
这不是最好的解决方案。但它是一些东西:)
Pd:我附上了 PySpark 文档,因为它包含比 Scala 文档更好的解释。
您可以创建一个递归方法来遍历 DataFrame 模式以重命名列:
import org.apache.spark.sql.types._
def renameAllCols(schema: StructType, rename: String => String): StructType = {
def recurRename(schema: StructType): Seq[StructField] = schema.fields.map{
case StructField(name, dtype: StructType, nullable, meta) =>
StructField(rename(name), StructType(recurRename(dtype)), nullable, meta)
case StructField(name, dtype: ArrayType, nullable, meta) if dtype.elementType.isInstanceOf[StructType] =>
StructField(rename(name), ArrayType(StructType(recurRename(dtype.elementType.asInstanceOf[StructType])), true), nullable, meta)
case StructField(name, dtype, nullable, meta) =>
StructField(rename(name), dtype, nullable, meta)
}
StructType(recurRename(schema))
}
使用以下示例对其进行测试:
import org.apache.spark.sql.functions._
import spark.implicits._
val renameFcn = (s: String) =>
s.replace("_", "").replaceAll("([A-Z])", "_").toLowerCase.dropWhile(_ == '_')
case class C(A_Bc: Int, D_Ef: Int)
val df = Seq(
(10, "a", C(1, 2), Seq(C(11, 12), C(13, 14)), Seq(101, 102)),
(20, "b", C(3, 4), Seq(C(15, 16)), Seq(103))
).toDF("_VkjLmnVop", "_KaTasLop", "AbcDef", "ArrStruct", "ArrInt")
val newDF = spark.createDataFrame(df.rdd, renameAllCols(df.schema, renameFcn))
newDF.printSchema
// root
// |-- vkj_lmn_vop: integer (nullable = false)
// |-- ka_tas_lop: string (nullable = true)
// |-- abc_def: struct (nullable = true)
// | |-- a_bc: integer (nullable = false)
// | |-- d_ef: integer (nullable = false)
// |-- arr_struct: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- a_bc: integer (nullable = false)
// | | |-- d_ef: integer (nullable = false)
// |-- arr_int: array (nullable = true)
// | |-- element: integer (containsNull = false)