使用 PySpark 在 Spark DataFrame 中将嵌套结构列重命名为小写
Rename nested struct columns to all in lower case in a Spark DataFrame using PySpark
使用 scala 已经可以使用类似的解决方案,但我需要 pyspark 中的解决方案。我是 python 的新手,需要您的帮助。
下面是 link for scala 解决方案,为了更好地理解需求。
我正在尝试更改 python 中 DataFrame 列的名称。我可以轻松更改直接字段的列名,但在转换数组结构列时遇到困难。
下面是我的 DataFrame 架构。
|-- VkjLmnVop: string (nullable = true)
|-- KaTasLop: string (nullable = true)
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- MnoPqrstUv: string (nullable = true)
| | |-- ManDevyIxyz: string (nullable = true)
但我需要如下架构
|-- vkjlmnvop: string (nullable = true)
|-- kataslop: string (nullable = true)
|-- abcdef: struct (nullable = true)
| |-- uvwxyz: struct (nullable = true)
| | |-- mnopqrstuv: string (nullable = true)
| | |-- mandevyixyz: string (nullable = true)
如何动态更改 Struct 列名称?
我猜这就是你想要的。希望能帮助到你!
def get_column_wise_schema(df_string_schema, df_columns):
# Returns a dictionary containing column name and corresponding column schema as string.
column_schema_dict = {}
i = 0
while i < len(df_columns):
current_col = df_columns[i]
next_col = df_columns[i + 1] if i < len(df_columns) - 1 else None
current_col_split_key = '[' + current_col + ': ' if i == 0 else ' ' + current_col + ': '
next_col_split_key = ']' if i == len(df_columns) - 1 else ', ' + next_col + ': '
column_schema_dict[current_col] = df_string_schema.split(current_col_split_key)[1].\
split(next_col_split_key)[0]
i += 1
return column_schema_dict
def convert_colnames_to_lower(spark_df):
columns = spark_df.columns
column_wise_schema_dict = get_column_wise_schema(spark_df.__str__(), columns)
col_exprs = []
for column_name in columns:
column_schema_lowercase = column_wise_schema_dict[column_name]
col_exprs.append(spf.col(column_name).cast(column_schema_lowercase).
alias(column_name.lower()))
return spark_df.select(*col_exprs)
ds = {'AbcDef': {'UvwXyz': {'VkjLmnVop': 'abcd'}}, 'HijKS': 'fgds'}
df = spark.read.json(sc.parallelize([ds]))
df.printSchema()
"""
root
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- VkjLmnVop: string (nullable = true)
|-- HijKS: string (nullable = true)
"""
converted_df = convert_colnames_to_lower(df)
converted_df.printSchema()
"""
root
|-- abcdef: struct (nullable = true)
| |-- uvwxyz: struct (nullable = true)
| | |-- vkjlmnvop: string (nullable = true)
|-- hijks: string (nullable = true)
"""
我还找到了类似逻辑的不同解决方案,但行数较少。
import pyspark.sql.functions as spf
ds = {'AbcDef': {'UvwXyz': {'VkjLmnVop': 'abcd'}}, 'HijKS': 'fgds'}
df = spark.read.json(sc.parallelize([ds]))
df.printSchema()
"""
root
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- VkjLmnVop: string (nullable = true)
|-- HijKS: string (nullable = true)
"""
for i in df.columns : df = df.withColumnRenamed(i, i.lower())
schemaDef = [y.replace("]","") for y in [x.replace("DataFrame[","") for x in df.__str__().split(", ")]]
for j in schemaDef :
columnName = j.split(": ")[0]
dataType = j.split(": ")[1]
df = df.withColumn(columnName, spf.col(columnName).cast(dataType.lower()))
df.printSchema()
"""
root
|-- abcdef: struct (nullable = true)
| |-- uvwxyz: struct (nullable = true)
| | |-- vkjlmnvop: string (nullable = true)
|-- hijks: string (nullable = true)
"""
使用 scala 已经可以使用类似的解决方案,但我需要 pyspark 中的解决方案。我是 python 的新手,需要您的帮助。
下面是 link for scala 解决方案,为了更好地理解需求。
我正在尝试更改 python 中 DataFrame 列的名称。我可以轻松更改直接字段的列名,但在转换数组结构列时遇到困难。
下面是我的 DataFrame 架构。
|-- VkjLmnVop: string (nullable = true)
|-- KaTasLop: string (nullable = true)
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- MnoPqrstUv: string (nullable = true)
| | |-- ManDevyIxyz: string (nullable = true)
但我需要如下架构
|-- vkjlmnvop: string (nullable = true)
|-- kataslop: string (nullable = true)
|-- abcdef: struct (nullable = true)
| |-- uvwxyz: struct (nullable = true)
| | |-- mnopqrstuv: string (nullable = true)
| | |-- mandevyixyz: string (nullable = true)
如何动态更改 Struct 列名称?
我猜这就是你想要的。希望能帮助到你!
def get_column_wise_schema(df_string_schema, df_columns):
# Returns a dictionary containing column name and corresponding column schema as string.
column_schema_dict = {}
i = 0
while i < len(df_columns):
current_col = df_columns[i]
next_col = df_columns[i + 1] if i < len(df_columns) - 1 else None
current_col_split_key = '[' + current_col + ': ' if i == 0 else ' ' + current_col + ': '
next_col_split_key = ']' if i == len(df_columns) - 1 else ', ' + next_col + ': '
column_schema_dict[current_col] = df_string_schema.split(current_col_split_key)[1].\
split(next_col_split_key)[0]
i += 1
return column_schema_dict
def convert_colnames_to_lower(spark_df):
columns = spark_df.columns
column_wise_schema_dict = get_column_wise_schema(spark_df.__str__(), columns)
col_exprs = []
for column_name in columns:
column_schema_lowercase = column_wise_schema_dict[column_name]
col_exprs.append(spf.col(column_name).cast(column_schema_lowercase).
alias(column_name.lower()))
return spark_df.select(*col_exprs)
ds = {'AbcDef': {'UvwXyz': {'VkjLmnVop': 'abcd'}}, 'HijKS': 'fgds'}
df = spark.read.json(sc.parallelize([ds]))
df.printSchema()
"""
root
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- VkjLmnVop: string (nullable = true)
|-- HijKS: string (nullable = true)
"""
converted_df = convert_colnames_to_lower(df)
converted_df.printSchema()
"""
root
|-- abcdef: struct (nullable = true)
| |-- uvwxyz: struct (nullable = true)
| | |-- vkjlmnvop: string (nullable = true)
|-- hijks: string (nullable = true)
"""
我还找到了类似逻辑的不同解决方案,但行数较少。
import pyspark.sql.functions as spf
ds = {'AbcDef': {'UvwXyz': {'VkjLmnVop': 'abcd'}}, 'HijKS': 'fgds'}
df = spark.read.json(sc.parallelize([ds]))
df.printSchema()
"""
root
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- VkjLmnVop: string (nullable = true)
|-- HijKS: string (nullable = true)
"""
for i in df.columns : df = df.withColumnRenamed(i, i.lower())
schemaDef = [y.replace("]","") for y in [x.replace("DataFrame[","") for x in df.__str__().split(", ")]]
for j in schemaDef :
columnName = j.split(": ")[0]
dataType = j.split(": ")[1]
df = df.withColumn(columnName, spf.col(columnName).cast(dataType.lower()))
df.printSchema()
"""
root
|-- abcdef: struct (nullable = true)
| |-- uvwxyz: struct (nullable = true)
| | |-- vkjlmnvop: string (nullable = true)
|-- hijks: string (nullable = true)
"""