使用 Apache Spark 在特定列上应用 PCA

Apply PCA on specific columns with Apache Spark

我正在尝试将 PCA 应用于包含 header 和字段的数据集 这是我使用的代码,任何帮助 select 我们应用 PCA 的特定列。

val inputMatrix = sc.textFile("C:/Users/mhattabi/Desktop/Realase of 01_06_2017/TopDrive_WithoutConstant.csv").map { line =>
  val values = line.split(",").map(_.toDouble)
  Vectors.dense(values)
}

val mat: RowMatrix = new RowMatrix(inputMatrix)
val pc: Matrix = mat.computePrincipalComponents(4)
// Project the rows to the linear space spanned by the top 4 principal components.

val projected: RowMatrix = mat.multiply(pc)

//更新版本 我试着这样做

val spark = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate()
val dataframe = spark.read.format("com.databricks.spark.csv")

val columnsToUse: Seq[String] =  Array("Col0","Col1", "Col2", "Col3", "Col4").toSeq
val k: Int = 2

val df = spark.read.format("csv").options(Map("header" -> "true", "inferSchema" -> "true")).load("C:/Users/mhattabi/Desktop/donnee/cassandraTest_1.csv")

val rf = new RFormula().setFormula(s"~ ${columnsToUse.mkString(" + ")}")
val pca = new PCA().setInputCol("features").setOutputCol("pcaFeatures").setK(k)

val featurized = rf.fit(df).transform(df)
//prinpal component
val principalComponent = pca.fit(featurized).transform(featurized)
principalComponent.select("pcaFeatures").show(4,false)

+-----------------------------------------+
|pcaFeatures                              |
+-----------------------------------------+
|[-0.536798281241379,0.495499034754084]   |
|[-0.32969328815797916,0.5672811417154808]|
|[-1.32283465170085,0.5982789033642704]   |
|[-0.6199718696225502,0.3173072633712586] |
+-----------------------------------------+

我得到了这个主要组件,我想将这个问题保存在 csv 文件中并添加 header.Any 非常感谢帮助 任何帮助将不胜感激。

非常感谢

java.lang.NumberFormatException: For input string: "DateTime"

这意味着在您的输入文件中有一个值 DateTime 然后您尝试将其转换为 Double

可能它在您输入文件header中的某处

在这种情况下您可以使用 RFormula :

import org.apache.spark.ml.feature.{RFormula, PCA}

val columnsToUse: Seq[String] = ???
val k: Int = ???

val df = spark.read.format("csv").options(Map("header" -> "true", "inferSchema" -> "true")).load("/tmp/foo.csv")

val rf = new RFormula().setFormula(s"~ ${columnsToUse.mkString(" + ")}")
val pca = new PCA().setInputCol("features").setK(k)

val featurized = rf.fit(df).transform(df)
val projected = pca.fit(featurized).transform(featurized)