将数据框中的向量列转换回数组列
Converting a vector column in a dataframe back into an array column
我有一个包含两列的数据框,其中一列(称为 dist)是一个密集向量。如何将其转换回整数数组列。
+---+-----+
| id| dist|
+---+-----+
|1.0|[2.0]|
|2.0|[4.0]|
|3.0|[6.0]|
|4.0|[8.0]|
+---+-----+
我尝试使用以下 udf 的几个变体,但 returns 类型不匹配错误
val toInt4 = udf[Int, Vector]({ (a) => (a)})
val result = df.withColumn("dist", toDf4(df("dist"))).select("dist")
我认为最简单的方法是转到 RDD API 然后返回。
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD
import sqlContext._
// The original data.
val input: DataFrame =
sc.parallelize(1 to 4)
.map(i => i.toDouble -> new DenseVector(Array(i.toDouble * 2)))
.toDF("id", "dist")
// Turn it into an RDD for manipulation.
val inputRDD: RDD[(Double, DenseVector)] =
input.map(row => row.getAs[Double]("id") -> row.getAs[DenseVector]("dist"))
// Change the DenseVector into an integer array.
val outputRDD: RDD[(Double, Array[Int])] =
inputRDD.mapValues(_.toArray.map(_.toInt))
// Go back to a DataFrame.
val output = outputRDD.toDF("id", "dist")
output.show
你得到:
+---+----+
| id|dist|
+---+----+
|1.0| [2]|
|2.0| [4]|
|3.0| [6]|
|4.0| [8]|
+---+----+
在 spark 2.0 中你可以这样做:
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.functions.udf
val vectorHead = udf{ x:DenseVector => x(0) }
df.withColumn("firstValue", vectorHead(df("vectorColumn")))
我努力了一段时间才从@ThomasLuechtefeld working 得到答案。但是 运行 陷入了这个非常令人沮丧的错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features_scaled)' due to data type mismatch: argument 1 requires vector type, however, '`features_scaled`' is of vector type.
原来我需要从 ml 包而不是 mllib 包导入 DenseVector。
所以这对我有用:
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.functions._
val vectorToColumn = udf{ (x:DenseVector, index: Int) => x(index) }
myDataframe.withColumn("clusters_scaled",vectorToColumn(col("features_scaled"),lit(0)))
是的,唯一的区别是第一行。这绝对应该是评论,但我没有声誉。抱歉!
我有一个包含两列的数据框,其中一列(称为 dist)是一个密集向量。如何将其转换回整数数组列。
+---+-----+
| id| dist|
+---+-----+
|1.0|[2.0]|
|2.0|[4.0]|
|3.0|[6.0]|
|4.0|[8.0]|
+---+-----+
我尝试使用以下 udf 的几个变体,但 returns 类型不匹配错误
val toInt4 = udf[Int, Vector]({ (a) => (a)})
val result = df.withColumn("dist", toDf4(df("dist"))).select("dist")
我认为最简单的方法是转到 RDD API 然后返回。
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD
import sqlContext._
// The original data.
val input: DataFrame =
sc.parallelize(1 to 4)
.map(i => i.toDouble -> new DenseVector(Array(i.toDouble * 2)))
.toDF("id", "dist")
// Turn it into an RDD for manipulation.
val inputRDD: RDD[(Double, DenseVector)] =
input.map(row => row.getAs[Double]("id") -> row.getAs[DenseVector]("dist"))
// Change the DenseVector into an integer array.
val outputRDD: RDD[(Double, Array[Int])] =
inputRDD.mapValues(_.toArray.map(_.toInt))
// Go back to a DataFrame.
val output = outputRDD.toDF("id", "dist")
output.show
你得到:
+---+----+
| id|dist|
+---+----+
|1.0| [2]|
|2.0| [4]|
|3.0| [6]|
|4.0| [8]|
+---+----+
在 spark 2.0 中你可以这样做:
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.sql.functions.udf
val vectorHead = udf{ x:DenseVector => x(0) }
df.withColumn("firstValue", vectorHead(df("vectorColumn")))
我努力了一段时间才从@ThomasLuechtefeld working 得到答案。但是 运行 陷入了这个非常令人沮丧的错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(features_scaled)' due to data type mismatch: argument 1 requires vector type, however, '`features_scaled`' is of vector type.
原来我需要从 ml 包而不是 mllib 包导入 DenseVector。
所以这对我有用:
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.functions._
val vectorToColumn = udf{ (x:DenseVector, index: Int) => x(index) }
myDataframe.withColumn("clusters_scaled",vectorToColumn(col("features_scaled"),lit(0)))
是的,唯一的区别是第一行。这绝对应该是评论,但我没有声誉。抱歉!