Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串
Pyspark: Identify the arrayType column from the the Struct and call udf to convert array to string
我正在创建一个加速器,用于将数据从源迁移到目标。例如,我将从 API 中选择数据并将数据迁移到 csv。在将数据转换为 csv 时,我遇到了处理数组类型的问题。我使用了 withColumn 和 concat_ws 方法(即 df1=df.withColumn('films',F.concat_ws(':',F.col(" films"))) films 是此转换的数组类型列 ),并且有效。现在我希望它动态地发生。我的意思是,在不指定列名的情况下,有没有一种方法可以从具有数组类型的结构中选择列名,然后调用 udf?
感谢您的宝贵时间!
您可以使用 df.schema. Depending on the type of the column you can apply concat_ws 或不使用来获取列的类型:
data = [["test1", "test2", [1,2,3], ["a","b","c"]]]
schema= ["col1", "col2", "arr1", "arr2"]
df = spark.createDataFrame(data, schema)
array_cols = [F.concat_ws(":", c.name).alias(c.name) \
for c in df.schema if isinstance(c.dataType, T.ArrayType) ]
other_cols = [F.col(c.name) \
for c in df.schema if not isinstance(c.dataType, T.ArrayType) ]
df = df.select(other_cols + array_cols)
结果:
+-----+-----+-----+-----+
| col1| col2| arr1| arr2|
+-----+-----+-----+-----+
|test1|test2|1:2:3|a:b:c|
+-----+-----+-----+-----+
我正在创建一个加速器,用于将数据从源迁移到目标。例如,我将从 API 中选择数据并将数据迁移到 csv。在将数据转换为 csv 时,我遇到了处理数组类型的问题。我使用了 withColumn 和 concat_ws 方法(即 df1=df.withColumn('films',F.concat_ws(':',F.col(" films"))) films 是此转换的数组类型列 ),并且有效。现在我希望它动态地发生。我的意思是,在不指定列名的情况下,有没有一种方法可以从具有数组类型的结构中选择列名,然后调用 udf?
感谢您的宝贵时间!
您可以使用 df.schema. Depending on the type of the column you can apply concat_ws 或不使用来获取列的类型:
data = [["test1", "test2", [1,2,3], ["a","b","c"]]]
schema= ["col1", "col2", "arr1", "arr2"]
df = spark.createDataFrame(data, schema)
array_cols = [F.concat_ws(":", c.name).alias(c.name) \
for c in df.schema if isinstance(c.dataType, T.ArrayType) ]
other_cols = [F.col(c.name) \
for c in df.schema if not isinstance(c.dataType, T.ArrayType) ]
df = df.select(other_cols + array_cols)
结果:
+-----+-----+-----+-----+
| col1| col2| arr1| arr2|
+-----+-----+-----+-----+
|test1|test2|1:2:3|a:b:c|
+-----+-----+-----+-----+