(Array/ML Vector/MLlib Vector) RDD to ML Vector Dataframe列
(Array/ML Vector/MLlib Vector) RDD to ML Vector Dataframe coulmn
我需要将 RDD 转换为单列 o.a.s.ml.linalg.Vector DataFrame,以便使用 ML 算法,特别是针对这种情况的 K-Means。这是我的 RDD:
val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.mllib.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble))))
我尝试按照 答案的建议进行操作,但没有成功,我想是因为您最终得到的是 MLlib Vector,当 运行 算法时它会抛出不匹配错误。现在,如果我改变这个:
import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}
val schema = new StructType()
.add("features", new VectorUDT())
对此:
import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.ml.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble))))
val schema = new StructType()
.add("features", new VectorUDT())
我会得到一个错误,因为 ML VectorUDT 是私有的。
我还尝试将 RDD 作为双精度数组转换为 Dataframe,并像这样获得 ML Dense Vector:
var parsedData = sc.textFile("/home/pililo/Documents/Mi_Memoria/Codigo/Datasets/Digits/digits480x.csv").map(s => Row(s.split(',').slice(0,64).map(_.toDouble)))
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
val schema2 = new StructType().add("features", ArrayType(DoubleType))
schema2: org.apache.spark.sql.types.StructType = StructType(StructField(features,ArrayType(DoubleType,true),true))
val df = spark.createDataFrame(parsedData, schema2)
df: org.apache.spark.sql.DataFrame = [features: array<double>]
val df2 = df.map{ case Row(features: Array[Double]) => Row(org.apache.spark.ml.linalg.Vectors.dense(features)) }
即使导入了 spark.implicits._
,也会抛出以下错误:
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
非常感谢任何帮助,谢谢!
超出我的想象:
使用 csv
来源和 VectorAssembler
:
import scala.util.Try
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.feature.VectorAssembler
val path: String = ???
val n: Int = ???
val m:Int = ???
val raw = spark.read.csv(path)
val featureCols = raw.columns.slice(n, m)
val exprs = featureCols.map(c => col(c).cast("double"))
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
assembler.transform(raw.select(exprs: _*)).select($"features")
使用text
源和UDF:
def parse_(n: Int, m: Int)(s: String) = Try(
Vectors.dense(s.split(',').slice(n, m).map(_.toDouble))
).toOption
def parse(n: Int, m: Int) = udf(parse_(n, m) _)
val raw = spark.read.text(path)
raw.select(parse(n, m)(col(raw.columns.head)).alias("features"))
使用 text
源和丢弃包装 Row
spark.read.text(path).as[String].map(parse_(n, m)).toDF
我需要将 RDD 转换为单列 o.a.s.ml.linalg.Vector DataFrame,以便使用 ML 算法,特别是针对这种情况的 K-Means。这是我的 RDD:
val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.mllib.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble))))
我尝试按照
import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}
val schema = new StructType()
.add("features", new VectorUDT())
对此:
import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.ml.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble))))
val schema = new StructType()
.add("features", new VectorUDT())
我会得到一个错误,因为 ML VectorUDT 是私有的。
我还尝试将 RDD 作为双精度数组转换为 Dataframe,并像这样获得 ML Dense Vector:
var parsedData = sc.textFile("/home/pililo/Documents/Mi_Memoria/Codigo/Datasets/Digits/digits480x.csv").map(s => Row(s.split(',').slice(0,64).map(_.toDouble)))
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
val schema2 = new StructType().add("features", ArrayType(DoubleType))
schema2: org.apache.spark.sql.types.StructType = StructType(StructField(features,ArrayType(DoubleType,true),true))
val df = spark.createDataFrame(parsedData, schema2)
df: org.apache.spark.sql.DataFrame = [features: array<double>]
val df2 = df.map{ case Row(features: Array[Double]) => Row(org.apache.spark.ml.linalg.Vectors.dense(features)) }
即使导入了 spark.implicits._
,也会抛出以下错误:
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
非常感谢任何帮助,谢谢!
超出我的想象:
使用
csv
来源和VectorAssembler
:import scala.util.Try import org.apache.spark.ml.linalg._ import org.apache.spark.ml.feature.VectorAssembler val path: String = ??? val n: Int = ??? val m:Int = ??? val raw = spark.read.csv(path) val featureCols = raw.columns.slice(n, m) val exprs = featureCols.map(c => col(c).cast("double")) val assembler = new VectorAssembler() .setInputCols(featureCols) .setOutputCol("features") assembler.transform(raw.select(exprs: _*)).select($"features")
使用
text
源和UDF:def parse_(n: Int, m: Int)(s: String) = Try( Vectors.dense(s.split(',').slice(n, m).map(_.toDouble)) ).toOption def parse(n: Int, m: Int) = udf(parse_(n, m) _) val raw = spark.read.text(path) raw.select(parse(n, m)(col(raw.columns.head)).alias("features"))
使用
text
源和丢弃包装Row
spark.read.text(path).as[String].map(parse_(n, m)).toDF