使用 'struct_name.*' 选择时为所有列添加前缀

Give prefix to all columns when selecting with 'struct_name.*'

下面的数据框是一个 temp_table 名称:'table_name' .
您将如何使用 spark.sql() 为所有列添加前缀?

root
 |-- MAIN_COL: struct (nullable = true)
 |    |-- a: string (nullable = true)
 |    |-- b: string (nullable = true)
 |    |-- c: string (nullable = true)
 |    |-- d: string (nullable = true)
 |    |-- f: long (nullable = true)
 |    |-- g: long (nullable = true)
 |    |-- h: long (nullable = true)
 |    |-- j: long (nullable = true)

下面的查询

spark.sql("select MAIN_COL.* from table_name")

返回名为 a、b、c... 的列,但如何使它们看起来都像pre_a、pre_b、pre_c?
想避免一一选择并给它们起别名。如果我有 30 列呢?

我希望SQL中使用的自定义UDF可以解决它,但真的不知道如何处理。

 # Generate a pandas DataFrame
import pandas as pd
a_dict={
    'a':[1,2,3,4,5],
    'b':[1,2,3,4,5],
    'c':[1,2,3,4,5],
    'e':list('abcde'),
    'f':list('abcde'),
    'g':list('abcde')
}
pandas_df=pd.DataFrame(a_dict)
# Create a Spark DataFrame from a pandas DataFrame using Arrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataFrame(pandas_df)

#struct
from pyspark.sql.functions import struct
main=df.select(struct(df.columns).alias("MAIN_COL"))

您可以试试这个:根据要求将所有列添加到 schema2

val schema2 = new StructType()
    .add("pre_a",StringType)
    .add("pre_b",StringType)
    .add("pre_c",StringType) 

现在 select 列使用类似:

df.select(col("MAIN_COL").cast(schema2)).show()

它将为您提供所有更新的列名称。

这是一种遍历字段并动态修改其名称的方法。首先使用 main.schema.fields[0].dataType.fields 访问目标字段。接下来使用 python mappre_ 添加到每个字段:

from pyspark.sql.types import *
from pyspark.sql.functions import col

inner_fields = main.schema.fields[0].dataType.fields

# [StructField(a,LongType,true),
#  StructField(b,LongType,true),
#  StructField(c,LongType,true),
#  StructField(e,StringType,true),
#  StructField(f,StringType,true),
#  StructField(g,StringType,true)]

pre_cols = list(map(lambda sf: StructField(f"pre_{sf.name}", sf.dataType, sf.nullable), inner_fields))

new_schema = StructType(pre_cols)

main.select(col("MAIN_COL").cast(new_schema)).printSchema()

# root
#  |-- MAIN_COL: struct (nullable = false)
#  |    |-- pre_a: long (nullable = true)
#  |    |-- pre_b: long (nullable = true)
#  |    |-- pre_c: long (nullable = true)
#  |    |-- pre_e: string (nullable = true)
#  |    |-- pre_f: string (nullable = true)
#  |    |-- pre_g: string (nullable = true)

最后,您可以使用 cast 和@Mahesh 已经提到的新模式。

Spark 的美妙之处在于,您可以通过编程方式操作元数据

这是一个延续原始代码片段的示例:

main.createOrReplaceTempView("table_name")

new_cols_select = ", ".join(["MAIN_COL." + col + " as pre_" + col for col in spark.sql("select MAIN_COL.* from table_name").columns])

new_df = spark.sql(f"select {new_cols_select} from table_name")

由于 Spark 的惰性和因为所有操作都只是元数据,此代码几乎没有任何性能成本,并且对 10 列或 500 列同样有效(我们实际上是在 1k 列上做类似的事情) .

也可以使用 df.schema object

以更优雅的方式获取原始列名

您也可以使用 PySpark 执行此操作:

df.select([col(col_name).alias('prefix' + col_name) for col_name in df])