Select 对 Spark 中 PCA 的贡献最重要的变量
Select most important variables in terms of their contributions to PCA in Spark
我正在 Spark Scala 中进行主成分分析。
我的输出只显示主成分得分向量。但是 (1) 我怎样才能显示主成分载荷和 (2) select 对第一和第二成分贡献最大的变量(即,对成分的载荷特别高)。我建议查看前两个组件,因为它们通常捕获最大的差异。
感谢任何帮助和建议!
到目前为止我实现了什么:
// import org.apache.spark.ml.feature.{PCA, VectorAssembler, StandardScaler}
// import org.apache.spark.ml.linalg.Vectors
val dataset = (spark.createDataFrame(
Seq((1.2, 1.3, 1.7, 1.9), (2.2, 2.3, 2.7, 2.9), (3.2, 3.3, 3.5, 3.7))
).toDF("f1", "f2", "f3", "f4"))
val assembler = new VectorAssembler()
.setInputCols(Array("f1", "f2", "f3", "f4"))
.setOutputCol("features")
val output = assembler.transform(dataset)
// Normalizing each feature to have a zero mean
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(false)
.setWithMean(true)
val scalerModel = scaler.fit(output)
val scaledData = scalerModel.transform(output)
// Compute PCA on a vector of features
// Display the principal component score vectors
val pca = new PCA()
.setInputCol("scaledFeatures")
.setOutputCol("pcaFeatures")
.setK(2)
.fit(scaledData)
// ? How to extract the variables contributing most to principal components?
PCAModel documentation 说 pca.pc
会给你一个 DenseMatrix
,其列是主成分。
我猜它没有指定列是否按顺序排列。我猜他们是,但你可能想检查一下。
从那里您可以.colIter
单独获取列并选择绝对值最高的条目以查看哪些变量贡献最大。
我在 python 中实现了一个 class,它主要计算与 pyspark 中主要组件相关的所有内容。我知道你的代码在 scala 中,但你可以看看它是否有帮助。 class的实现如下:
from pyspark import SparkConf,SparkContext
from pyspark.sql.context import HiveContext
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler,PCA,StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.param import Param,Params
from pyspark.sql.types import *
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import _convert_to_vector, Vectors, Matrix, DenseMatrix
from pyspark.sql.functions import array, col, explode, struct, lit, udf, sum, when,rowNumber,avg,pow,sqrt,mean,log,desc
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.sql.window import Window
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
import time, os, sys, json, math
import datetime as dt
import subprocess
import getpass
import pdb
import csv
import pandas as pd
import numpy as np
from copy import copy
from numpy.linalg import eigh
import itertools
class princomp:
def __init__(self,n=5,std=True,prefix='pcomp'):
self.n=n
self.std=std
self.prefix=prefix
self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
self.inputcol = "std_features"
self.outputcol = "pca_features"
self.vars = 'all'
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
self.components = sqlContext.createDataFrame(pd.DataFrame([0]))
self.projections = sqlContext.createDataFrame(pd.DataFrame([0]))
self.result = sqlContext.createDataFrame(pd.DataFrame([0]))
def helpme(self):
print ('|-- Please note that output_parameters will have null values before calling the fit method')
print ('|-- n : input_parameter : sets the number of principal components, default = 5')
print ('|-- std : input_parameter : True/False value specifying whether to standardize the principal components, default = True')
print ('|-- prefix : input_parameter : string specifying the prefix for columns of principal components , default = pcomp')
print ('|-- vars : input_parameter : list of variable names to be used for principal components, default = all')
print ('|-- components : output_parameter : pandas dataframe of coefficients of different input columns for computing principal components')
print ('|-- result : output_parameter : spark dataframe of original variables joined with projections of principal components')
print ('|-- covariance : output_parameter : Numpy array of the covariance matrix')
print ('|-- eigvals : output_parameter : Numpy array of all eigenvalues')
print ('|-- eigvecs : output_parameter : Numpy array of all eigenvectors')
print ('|-- varianceexplained : output_parameter : variance explained by the n principal components')
print ('|-- outputcompfile(file) : method : outputs the components matrix to the specified file')
print ('|-- fit(inputdf,myfeatures) : method : fit method which computes all output parameters')
# SET methods
def setn(self,val):
self.n = val
self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
def setstd(self,val):
self.std = val
def setprefix(self,val):
self.prefix=val
self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
def setcols(self,val):
self.cols = val
def setinputcol(self,val):
self.inputcol = val
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
def setoutputcol(self,val):
self.outputcol = val
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
def setvars(self,val):
self.vars = val
def setmodel(self,val):
self.model = val
def setcomponents(self,val):
self.components=val
def setprojections(self,val):
self.projections=val
def setresult(self,val):
self.result = val
# GET methods
def getn(self):
return self.n
def getstd(self):
return self.std
def getprefix(self):
return self.prefix
def getcols(self):
return self.cols
def getinputcol(self):
return self.inputcol
def getoutputcol(self):
return self.outputcol
def getvars(self):
return self.vars
def getmodel(self):
return self.model
def getcomponents(self):
return self.components
def getprojections(self):
return self.projections
def getresult(self):
return self.result
# CORE methods
def vectorizedf(inputdf,vars='all'):
"""Returns the input spark dataframe with an additional column of dense vector features"""
if vars=='all':
myfeatures = inputdf.columns
else:
myfeatures=vars
assembler = VectorAssembler(inputCols = myfeatures,outputCol="features")
assembled = assembler.transform(inputdf)
as_dense = udf(
lambda v: DenseVector(v.toArray()) if v is not None else None,
VectorUDT()
)
df_dense = assembled.withColumn("features1", as_dense(assembled.features))
df_dense2 = df_dense.drop("features")
df_dense3 = df_dense2.withColumnRenamed("features1","features")
return df_dense3
def outputcompfile(self,filewlocation):
""" Outputs the loading of principal components to the file specified"""
df = self.components
df.to_csv(filewlocation,index=False)
print ("Component matrix is now available at the location : "+filewlocation)
def identity(self):
""" Outputs an identity matrix in the form of features column in a dataframe"""
iden = np.identity(len(self.vars)).tolist()
rddi = sc.parallelize(iden)
df_identity = rddi.map(lambda line:Row(std_features=Vectors.dense(line))).toDF()
return df_identity
def fit(self,inputdf,myfeatures):
""" Fits the input dataframe in a PCA model with the given features """
start_time = time.time() # Start Timer
if myfeatures=='all':
self.vars = inputdf.columns
else:
self.vars = myfeatures
# vectorize and scale
df_dense = self.vectorizedf(inputdf,self.vars)
df_normalized = self.scalemeanstd(df_dense)
# Compute covariance matrix, eigenvalues and eigenvectors
dfzeromean = df_normalized.select(self.inputcol)
self.covariance = dfzeromean.map(lambda x:np.outer(x,x)).sum()/dfzeromean.count()
col1 = self.covariance.shape[1]
eigvals,eigvecs = eigh(self.covariance)
inds = np.argsort(eigvals)
self.eigvals = eigvals[inds[-1:-(col1+1):-1]]
self.eigvecs = -1*eigvecs.T[inds[-1:-(col1+1):-1]]
self.varianceexplained = np.sum(self.eigvals[0:self.n])/np.sum(self.eigvals)
# Fit PCA model
model1 = self.model.fit(df_normalized)
df_features = model1.transform(df_normalized)
# Compute components and put in a pandas dataframe
df_identity = self.identity()
components = model1.transform(df_identity)
components = components.withColumnRenamed('pca_features','components')
edf_rdd = components.select("components").rdd.map(lambda x: tuple(x.components.toArray().tolist()))
edf_pandas = edf_rdd.toDF(self.cols).toPandas()
comp_ind = sqlContext.createDataFrame([Row(industries=self.vars)])
comp_ind_pandas = comp_ind.select(explode(comp_ind.industries).alias("Variable")).toPandas()
self.components = pd.concat([comp_ind_pandas,edf_pandas],axis=1)
# Compute and standardize projections if self.std = True
if self.std:
projections1=self.scalemeanstd(df_features,inputcol = "pca_features",outputcol = "projections")
else:
projections1 = df_features.withColumnRenamed('pca_features','projections')
# Prepare data for output
self.projections = projections1.select('projections')
drop_list = ['features','std_features','pca_features']
projections2 = projections1.select([c for c in projections1.columns if c not in drop_list])
l = ['x.'+c for c in inputdf.columns]
cst = '['+",".join(l)+']'
final_df = projections1.rdd.map(lambda x: tuple(eval(cst))+tuple(x.projections.toArray().tolist())).toDF(inputdf.columns+self.cols)
self.result = final_df
print("PCA fitting took a total of %s seconds " % (time.time() - start_time))
使用这个class的方法如下:
fs = ep.princomp(n=15)
fs.fit(df,select_features) # here select_features is the list of columns that you wish to perform PCA on
fs.outputcompfile('/some/output/location/components.csv')
print (fs.eigvals)
print (fs.varianceexplained)
因此您可以在包含所有因子载荷的位置输出组件文件。使用该文件,您可以找出对前 n 个主成分贡献最大的所有变量。
我正在 Spark Scala 中进行主成分分析。
我的输出只显示主成分得分向量。但是 (1) 我怎样才能显示主成分载荷和 (2) select 对第一和第二成分贡献最大的变量(即,对成分的载荷特别高)。我建议查看前两个组件,因为它们通常捕获最大的差异。
感谢任何帮助和建议!
到目前为止我实现了什么:
// import org.apache.spark.ml.feature.{PCA, VectorAssembler, StandardScaler}
// import org.apache.spark.ml.linalg.Vectors
val dataset = (spark.createDataFrame(
Seq((1.2, 1.3, 1.7, 1.9), (2.2, 2.3, 2.7, 2.9), (3.2, 3.3, 3.5, 3.7))
).toDF("f1", "f2", "f3", "f4"))
val assembler = new VectorAssembler()
.setInputCols(Array("f1", "f2", "f3", "f4"))
.setOutputCol("features")
val output = assembler.transform(dataset)
// Normalizing each feature to have a zero mean
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(false)
.setWithMean(true)
val scalerModel = scaler.fit(output)
val scaledData = scalerModel.transform(output)
// Compute PCA on a vector of features
// Display the principal component score vectors
val pca = new PCA()
.setInputCol("scaledFeatures")
.setOutputCol("pcaFeatures")
.setK(2)
.fit(scaledData)
// ? How to extract the variables contributing most to principal components?
PCAModel documentation 说 pca.pc
会给你一个 DenseMatrix
,其列是主成分。
我猜它没有指定列是否按顺序排列。我猜他们是,但你可能想检查一下。
从那里您可以.colIter
单独获取列并选择绝对值最高的条目以查看哪些变量贡献最大。
我在 python 中实现了一个 class,它主要计算与 pyspark 中主要组件相关的所有内容。我知道你的代码在 scala 中,但你可以看看它是否有帮助。 class的实现如下:
from pyspark import SparkConf,SparkContext
from pyspark.sql.context import HiveContext
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler,PCA,StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.param import Param,Params
from pyspark.sql.types import *
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import _convert_to_vector, Vectors, Matrix, DenseMatrix
from pyspark.sql.functions import array, col, explode, struct, lit, udf, sum, when,rowNumber,avg,pow,sqrt,mean,log,desc
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.sql.window import Window
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
import time, os, sys, json, math
import datetime as dt
import subprocess
import getpass
import pdb
import csv
import pandas as pd
import numpy as np
from copy import copy
from numpy.linalg import eigh
import itertools
class princomp:
def __init__(self,n=5,std=True,prefix='pcomp'):
self.n=n
self.std=std
self.prefix=prefix
self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
self.inputcol = "std_features"
self.outputcol = "pca_features"
self.vars = 'all'
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
self.components = sqlContext.createDataFrame(pd.DataFrame([0]))
self.projections = sqlContext.createDataFrame(pd.DataFrame([0]))
self.result = sqlContext.createDataFrame(pd.DataFrame([0]))
def helpme(self):
print ('|-- Please note that output_parameters will have null values before calling the fit method')
print ('|-- n : input_parameter : sets the number of principal components, default = 5')
print ('|-- std : input_parameter : True/False value specifying whether to standardize the principal components, default = True')
print ('|-- prefix : input_parameter : string specifying the prefix for columns of principal components , default = pcomp')
print ('|-- vars : input_parameter : list of variable names to be used for principal components, default = all')
print ('|-- components : output_parameter : pandas dataframe of coefficients of different input columns for computing principal components')
print ('|-- result : output_parameter : spark dataframe of original variables joined with projections of principal components')
print ('|-- covariance : output_parameter : Numpy array of the covariance matrix')
print ('|-- eigvals : output_parameter : Numpy array of all eigenvalues')
print ('|-- eigvecs : output_parameter : Numpy array of all eigenvectors')
print ('|-- varianceexplained : output_parameter : variance explained by the n principal components')
print ('|-- outputcompfile(file) : method : outputs the components matrix to the specified file')
print ('|-- fit(inputdf,myfeatures) : method : fit method which computes all output parameters')
# SET methods
def setn(self,val):
self.n = val
self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
def setstd(self,val):
self.std = val
def setprefix(self,val):
self.prefix=val
self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
def setcols(self,val):
self.cols = val
def setinputcol(self,val):
self.inputcol = val
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
def setoutputcol(self,val):
self.outputcol = val
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
def setvars(self,val):
self.vars = val
def setmodel(self,val):
self.model = val
def setcomponents(self,val):
self.components=val
def setprojections(self,val):
self.projections=val
def setresult(self,val):
self.result = val
# GET methods
def getn(self):
return self.n
def getstd(self):
return self.std
def getprefix(self):
return self.prefix
def getcols(self):
return self.cols
def getinputcol(self):
return self.inputcol
def getoutputcol(self):
return self.outputcol
def getvars(self):
return self.vars
def getmodel(self):
return self.model
def getcomponents(self):
return self.components
def getprojections(self):
return self.projections
def getresult(self):
return self.result
# CORE methods
def vectorizedf(inputdf,vars='all'):
"""Returns the input spark dataframe with an additional column of dense vector features"""
if vars=='all':
myfeatures = inputdf.columns
else:
myfeatures=vars
assembler = VectorAssembler(inputCols = myfeatures,outputCol="features")
assembled = assembler.transform(inputdf)
as_dense = udf(
lambda v: DenseVector(v.toArray()) if v is not None else None,
VectorUDT()
)
df_dense = assembled.withColumn("features1", as_dense(assembled.features))
df_dense2 = df_dense.drop("features")
df_dense3 = df_dense2.withColumnRenamed("features1","features")
return df_dense3
def outputcompfile(self,filewlocation):
""" Outputs the loading of principal components to the file specified"""
df = self.components
df.to_csv(filewlocation,index=False)
print ("Component matrix is now available at the location : "+filewlocation)
def identity(self):
""" Outputs an identity matrix in the form of features column in a dataframe"""
iden = np.identity(len(self.vars)).tolist()
rddi = sc.parallelize(iden)
df_identity = rddi.map(lambda line:Row(std_features=Vectors.dense(line))).toDF()
return df_identity
def fit(self,inputdf,myfeatures):
""" Fits the input dataframe in a PCA model with the given features """
start_time = time.time() # Start Timer
if myfeatures=='all':
self.vars = inputdf.columns
else:
self.vars = myfeatures
# vectorize and scale
df_dense = self.vectorizedf(inputdf,self.vars)
df_normalized = self.scalemeanstd(df_dense)
# Compute covariance matrix, eigenvalues and eigenvectors
dfzeromean = df_normalized.select(self.inputcol)
self.covariance = dfzeromean.map(lambda x:np.outer(x,x)).sum()/dfzeromean.count()
col1 = self.covariance.shape[1]
eigvals,eigvecs = eigh(self.covariance)
inds = np.argsort(eigvals)
self.eigvals = eigvals[inds[-1:-(col1+1):-1]]
self.eigvecs = -1*eigvecs.T[inds[-1:-(col1+1):-1]]
self.varianceexplained = np.sum(self.eigvals[0:self.n])/np.sum(self.eigvals)
# Fit PCA model
model1 = self.model.fit(df_normalized)
df_features = model1.transform(df_normalized)
# Compute components and put in a pandas dataframe
df_identity = self.identity()
components = model1.transform(df_identity)
components = components.withColumnRenamed('pca_features','components')
edf_rdd = components.select("components").rdd.map(lambda x: tuple(x.components.toArray().tolist()))
edf_pandas = edf_rdd.toDF(self.cols).toPandas()
comp_ind = sqlContext.createDataFrame([Row(industries=self.vars)])
comp_ind_pandas = comp_ind.select(explode(comp_ind.industries).alias("Variable")).toPandas()
self.components = pd.concat([comp_ind_pandas,edf_pandas],axis=1)
# Compute and standardize projections if self.std = True
if self.std:
projections1=self.scalemeanstd(df_features,inputcol = "pca_features",outputcol = "projections")
else:
projections1 = df_features.withColumnRenamed('pca_features','projections')
# Prepare data for output
self.projections = projections1.select('projections')
drop_list = ['features','std_features','pca_features']
projections2 = projections1.select([c for c in projections1.columns if c not in drop_list])
l = ['x.'+c for c in inputdf.columns]
cst = '['+",".join(l)+']'
final_df = projections1.rdd.map(lambda x: tuple(eval(cst))+tuple(x.projections.toArray().tolist())).toDF(inputdf.columns+self.cols)
self.result = final_df
print("PCA fitting took a total of %s seconds " % (time.time() - start_time))
使用这个class的方法如下:
fs = ep.princomp(n=15)
fs.fit(df,select_features) # here select_features is the list of columns that you wish to perform PCA on
fs.outputcompfile('/some/output/location/components.csv')
print (fs.eigvals)
print (fs.varianceexplained)
因此您可以在包含所有因子载荷的位置输出组件文件。使用该文件,您可以找出对前 n 个主成分贡献最大的所有变量。