Scala Spark - 将矢量列拆分为 Spark DataFrame 中的单独列
Scala Spark - split vector column into separate columns in a Spark DataFrame
我有一个 Spark DataFrame,其中有一列包含 Vector 值。矢量值都是 n 维的,也就是长度相同。我还有一个列名列表 Array("f1", "f2", "f3", ..., "fn")
,每个列名对应于向量中的一个元素。
some_columns... | Features
... | [0,1,0,..., 0]
to
some_columns... | f1 | f2 | f3 | ... | fn
... | 0 | 1 | 0 | ... | 0
实现此目标的最佳方法是什么?我想到了一种方法,即使用 createDataFrame(Row(Features), featureNameList)
创建一个新的 DataFrame,然后与旧的 DataFrame 连接,但它需要 spark 上下文才能使用 createDataFrame。我只想转换现有的数据框。我也知道 .withColumn("fi", value)
但是如果 n
很大怎么办?
我是 Scala 和 Spark 的新手,找不到任何好的例子。我认为这可能是一项常见的任务。我的特殊情况是我使用了 CountVectorizer
并希望单独恢复每一列以获得更好的可读性,而不是只有向量结果。
一种方法是将 vector
列转换为 array<double>
,然后使用 getItem
提取单个元素。
import org.apache.spark.sql.functions._
import org.apache.spark.ml._
val df = Seq( (1 , linalg.Vectors.dense(1,0,1,1,0) ) ).toDF("id", "features")
//df: org.apache.spark.sql.DataFrame = [id: int, features: vector]
df.show
//+---+---------------------+
//|id |features |
//+---+---------------------+
//|1 |[1.0,0.0,1.0,1.0,0.0]|
//+---+---------------------+
// A UDF to convert VectorUDT to ArrayType
val vecToArray = udf( (xs: linalg.Vector) => xs.toArray )
// Add a ArrayType Column
val dfArr = df.withColumn("featuresArr" , vecToArray($"features") )
// Array of element names that need to be fetched
// ArrayIndexOutOfBounds is not checked.
// sizeof `elements` should be equal to the number of entries in column `features`
val elements = Array("f1", "f2", "f3", "f4", "f5")
// Create a SQL-like expression using the array
val sqlExpr = elements.zipWithIndex.map{ case (alias, idx) => col("featuresArr").getItem(idx).as(alias) }
// Extract Elements from dfArr
dfArr.select(sqlExpr : _*).show
//+---+---+---+---+---+
//| f1| f2| f3| f4| f5|
//+---+---+---+---+---+
//|1.0|0.0|1.0|1.0|0.0|
//+---+---+---+---+---+
我有一个 Spark DataFrame,其中有一列包含 Vector 值。矢量值都是 n 维的,也就是长度相同。我还有一个列名列表 Array("f1", "f2", "f3", ..., "fn")
,每个列名对应于向量中的一个元素。
some_columns... | Features
... | [0,1,0,..., 0]
to
some_columns... | f1 | f2 | f3 | ... | fn
... | 0 | 1 | 0 | ... | 0
实现此目标的最佳方法是什么?我想到了一种方法,即使用 createDataFrame(Row(Features), featureNameList)
创建一个新的 DataFrame,然后与旧的 DataFrame 连接,但它需要 spark 上下文才能使用 createDataFrame。我只想转换现有的数据框。我也知道 .withColumn("fi", value)
但是如果 n
很大怎么办?
我是 Scala 和 Spark 的新手,找不到任何好的例子。我认为这可能是一项常见的任务。我的特殊情况是我使用了 CountVectorizer
并希望单独恢复每一列以获得更好的可读性,而不是只有向量结果。
一种方法是将 vector
列转换为 array<double>
,然后使用 getItem
提取单个元素。
import org.apache.spark.sql.functions._
import org.apache.spark.ml._
val df = Seq( (1 , linalg.Vectors.dense(1,0,1,1,0) ) ).toDF("id", "features")
//df: org.apache.spark.sql.DataFrame = [id: int, features: vector]
df.show
//+---+---------------------+
//|id |features |
//+---+---------------------+
//|1 |[1.0,0.0,1.0,1.0,0.0]|
//+---+---------------------+
// A UDF to convert VectorUDT to ArrayType
val vecToArray = udf( (xs: linalg.Vector) => xs.toArray )
// Add a ArrayType Column
val dfArr = df.withColumn("featuresArr" , vecToArray($"features") )
// Array of element names that need to be fetched
// ArrayIndexOutOfBounds is not checked.
// sizeof `elements` should be equal to the number of entries in column `features`
val elements = Array("f1", "f2", "f3", "f4", "f5")
// Create a SQL-like expression using the array
val sqlExpr = elements.zipWithIndex.map{ case (alias, idx) => col("featuresArr").getItem(idx).as(alias) }
// Extract Elements from dfArr
dfArr.select(sqlExpr : _*).show
//+---+---+---+---+---+
//| f1| f2| f3| f4| f5|
//+---+---+---+---+---+
//|1.0|0.0|1.0|1.0|0.0|
//+---+---+---+---+---+