使用 'struct_name.*' 选择时为所有列添加前缀
Give prefix to all columns when selecting with 'struct_name.*'
下面的数据框是一个 temp_table 名称:'table_name' .
您将如何使用 spark.sql() 为所有列添加前缀?
root
|-- MAIN_COL: struct (nullable = true)
| |-- a: string (nullable = true)
| |-- b: string (nullable = true)
| |-- c: string (nullable = true)
| |-- d: string (nullable = true)
| |-- f: long (nullable = true)
| |-- g: long (nullable = true)
| |-- h: long (nullable = true)
| |-- j: long (nullable = true)
下面的查询
spark.sql("select MAIN_COL.* from table_name")
返回名为 a、b、c... 的列,但如何使它们看起来都像pre_a、pre_b、pre_c?
想避免一一选择并给它们起别名。如果我有 30 列呢?
我希望SQL中使用的自定义UDF可以解决它,但真的不知道如何处理。
# Generate a pandas DataFrame
import pandas as pd
a_dict={
'a':[1,2,3,4,5],
'b':[1,2,3,4,5],
'c':[1,2,3,4,5],
'e':list('abcde'),
'f':list('abcde'),
'g':list('abcde')
}
pandas_df=pd.DataFrame(a_dict)
# Create a Spark DataFrame from a pandas DataFrame using Arrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataFrame(pandas_df)
#struct
from pyspark.sql.functions import struct
main=df.select(struct(df.columns).alias("MAIN_COL"))
您可以试试这个:根据要求将所有列添加到 schema2
val schema2 = new StructType()
.add("pre_a",StringType)
.add("pre_b",StringType)
.add("pre_c",StringType)
现在 select 列使用类似:
df.select(col("MAIN_COL").cast(schema2)).show()
它将为您提供所有更新的列名称。
这是一种遍历字段并动态修改其名称的方法。首先使用 main.schema.fields[0].dataType.fields
访问目标字段。接下来使用 python map
将 pre_
添加到每个字段:
from pyspark.sql.types import *
from pyspark.sql.functions import col
inner_fields = main.schema.fields[0].dataType.fields
# [StructField(a,LongType,true),
# StructField(b,LongType,true),
# StructField(c,LongType,true),
# StructField(e,StringType,true),
# StructField(f,StringType,true),
# StructField(g,StringType,true)]
pre_cols = list(map(lambda sf: StructField(f"pre_{sf.name}", sf.dataType, sf.nullable), inner_fields))
new_schema = StructType(pre_cols)
main.select(col("MAIN_COL").cast(new_schema)).printSchema()
# root
# |-- MAIN_COL: struct (nullable = false)
# | |-- pre_a: long (nullable = true)
# | |-- pre_b: long (nullable = true)
# | |-- pre_c: long (nullable = true)
# | |-- pre_e: string (nullable = true)
# | |-- pre_f: string (nullable = true)
# | |-- pre_g: string (nullable = true)
最后,您可以使用 cast
和@Mahesh 已经提到的新模式。
Spark 的美妙之处在于,您可以通过编程方式操作元数据
这是一个延续原始代码片段的示例:
main.createOrReplaceTempView("table_name")
new_cols_select = ", ".join(["MAIN_COL." + col + " as pre_" + col for col in spark.sql("select MAIN_COL.* from table_name").columns])
new_df = spark.sql(f"select {new_cols_select} from table_name")
由于 Spark 的惰性和因为所有操作都只是元数据,此代码几乎没有任何性能成本,并且对 10 列或 500 列同样有效(我们实际上是在 1k 列上做类似的事情) .
也可以使用 df.schema
object
以更优雅的方式获取原始列名
您也可以使用 PySpark 执行此操作:
df.select([col(col_name).alias('prefix' + col_name) for col_name in df])
下面的数据框是一个 temp_table 名称:'table_name' .
您将如何使用 spark.sql() 为所有列添加前缀?
root
|-- MAIN_COL: struct (nullable = true)
| |-- a: string (nullable = true)
| |-- b: string (nullable = true)
| |-- c: string (nullable = true)
| |-- d: string (nullable = true)
| |-- f: long (nullable = true)
| |-- g: long (nullable = true)
| |-- h: long (nullable = true)
| |-- j: long (nullable = true)
下面的查询
spark.sql("select MAIN_COL.* from table_name")
返回名为 a、b、c... 的列,但如何使它们看起来都像pre_a、pre_b、pre_c?
想避免一一选择并给它们起别名。如果我有 30 列呢?
我希望SQL中使用的自定义UDF可以解决它,但真的不知道如何处理。
# Generate a pandas DataFrame
import pandas as pd
a_dict={
'a':[1,2,3,4,5],
'b':[1,2,3,4,5],
'c':[1,2,3,4,5],
'e':list('abcde'),
'f':list('abcde'),
'g':list('abcde')
}
pandas_df=pd.DataFrame(a_dict)
# Create a Spark DataFrame from a pandas DataFrame using Arrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataFrame(pandas_df)
#struct
from pyspark.sql.functions import struct
main=df.select(struct(df.columns).alias("MAIN_COL"))
您可以试试这个:根据要求将所有列添加到 schema2
val schema2 = new StructType()
.add("pre_a",StringType)
.add("pre_b",StringType)
.add("pre_c",StringType)
现在 select 列使用类似:
df.select(col("MAIN_COL").cast(schema2)).show()
它将为您提供所有更新的列名称。
这是一种遍历字段并动态修改其名称的方法。首先使用 main.schema.fields[0].dataType.fields
访问目标字段。接下来使用 python map
将 pre_
添加到每个字段:
from pyspark.sql.types import *
from pyspark.sql.functions import col
inner_fields = main.schema.fields[0].dataType.fields
# [StructField(a,LongType,true),
# StructField(b,LongType,true),
# StructField(c,LongType,true),
# StructField(e,StringType,true),
# StructField(f,StringType,true),
# StructField(g,StringType,true)]
pre_cols = list(map(lambda sf: StructField(f"pre_{sf.name}", sf.dataType, sf.nullable), inner_fields))
new_schema = StructType(pre_cols)
main.select(col("MAIN_COL").cast(new_schema)).printSchema()
# root
# |-- MAIN_COL: struct (nullable = false)
# | |-- pre_a: long (nullable = true)
# | |-- pre_b: long (nullable = true)
# | |-- pre_c: long (nullable = true)
# | |-- pre_e: string (nullable = true)
# | |-- pre_f: string (nullable = true)
# | |-- pre_g: string (nullable = true)
最后,您可以使用 cast
和@Mahesh 已经提到的新模式。
Spark 的美妙之处在于,您可以通过编程方式操作元数据
这是一个延续原始代码片段的示例:
main.createOrReplaceTempView("table_name")
new_cols_select = ", ".join(["MAIN_COL." + col + " as pre_" + col for col in spark.sql("select MAIN_COL.* from table_name").columns])
new_df = spark.sql(f"select {new_cols_select} from table_name")
由于 Spark 的惰性和因为所有操作都只是元数据,此代码几乎没有任何性能成本,并且对 10 列或 500 列同样有效(我们实际上是在 1k 列上做类似的事情) .
也可以使用 df.schema
object
您也可以使用 PySpark 执行此操作:
df.select([col(col_name).alias('prefix' + col_name) for col_name in df])