使用 PySpark 进行 Pca 分析

Pca analysis with PySpark

我正在使用 PySpark 作为工具进行 PCA 分析,但由于从 csv 文件读取的数据的兼容性,我遇到了错误。我该怎么办?你能帮帮我吗?

from __future__ import print_function
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import udf
import pandas as pd
import numpy as np
from numpy import array


conf = SparkConf().setAppName("building a warehouse")
sc = SparkContext(conf=conf)

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("PCAExample")\
        .getOrCreate()



   data = sc.textFile('dataset.csv') \
        .map(lambda line:  line.split(','))\
        .collect()
   #create a data frame from data read from csv file 
   df = spark.createDataFrame(data, ["features"])
   #convert data to vector udt

   df.show()


   pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
   model = pca.fit(df)

   result =  model.transform(df).select("pcaFeatures")
   result.show(truncate=False)

   spark.stop()

这是我遇到的错误:

File "C:/spark/spark-2.1.0-bin-hadoop2.7/bin/pca_bigdata.py", line 38, in       <module>
model = pca.fit(df)
pyspark.sql.utils.IllegalArgumentException: u'requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually StringType.'

此处错误指定其自身列需要 VectorUDT 而不是 StringType。所以这对你有用:-

from pyspark.mllib.linalg import SparseVector, VectorUDT       
from pyspark.sql.types import StringType, StructField, StructType
df = spark.createDataFrame(data, StructType([
                         StructField("features", VectorUDT(), True)
                       ]))