Pyspark 和 PCA:如何提取此 PCA 的特征向量?我如何计算他们解释的方差有多大?

Pyspark and PCA: How can I extract the eigenvectors of this PCA? How can I calculate how much variance they are explaining?

我正在使用 pyspark(使用 spark ml 库)使用 PCA 模型降低 Spark DataFrame 的维度,如下所示:

pca = PCA(k=3, inputCol="features", outputCol="pca_features")
model = pca.fit(data)

其中 data 是一个 Spark DataFrame,其中一列标记为 features,这是一个 DenseVector 的 3 维:

data.take(1)
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1')

拟合后,我对数据进行改造:

transformed = model.transform(data)
transformed.first()
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1', pca_features=DenseVector([-0.33256, 0.8668, 0.625]))

如何提取这个 PCA 的特征向量?我如何计算他们解释的方差有多大?

[更新: 从 Spark 2.2 开始,PCA 和 SVD 在 PySpark 中都可用 - 请参阅 JIRA ticket SPARK-6227 and PCA & PCAModel for Spark ML 2.2;下面的原始答案仍然适用于旧的 Spark 版本。]

好吧,这似乎令人难以置信,但确实没有办法从 PCA 分解中提取此类信息(至少从 Spark 1.5 开始是这样)。但是同样,有许多类似的“抱怨”——例如,参见 here,因为无法从 CrossValidatorModel.

中提取最佳参数

幸运的是,几个月前,我参加了 AMPLab(伯克利)和 Databricks 的 'Scalable Machine Learning' MOOC,即 Spark 的创建者,我们在其中实施了完整的 PCA 管道 'by hand' 作为家庭作业。我从那时起就修改了我的函数(放心,我得到了充分的信任:-),以便使用数据帧作为输入(而不是 RDD 的),其格式与您的格式相同(即 DenseVectors 的行包含数值特征)。

我们首先需要定义一个中间函数,estimatedCovariance,如下:

import numpy as np

def estimateCovariance(df):
    """Compute the covariance matrix for a given dataframe.

    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.

    Args:
        df:  A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.

    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input dataframe.
    """
    m = df.select(df['features']).map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).map(lambda x:   x[0]).map(lambda x: x-m)  # subtract the mean

    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

然后,我们可以写一个主要的pca函数如下:

from numpy.linalg import eigh

def pca(df, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.

    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k (int): The number of principal components to return.

    Returns:
        tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
        scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
        rows equals the length of the arrays in the input `RDD` and the number of columns equals
        `k`.  The `RDD` of scores has the same number of rows as `data` and consists of arrays
        of length `k`.  Eigenvalues is an array of length d (the number of features).
     """
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvals
    score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
    # Return the `k` principal components, `k` scores, and all eigenvalues

    return components.T, score, eigVals

测试

让我们先看看现有方法的结果,使用来自 Spark ML PCA 的示例数据 documentation(将它们修改为全部 DenseVectors):

 from pyspark.ml.feature import *
 from pyspark.mllib.linalg import Vectors
 data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
         (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
         (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
 df = sqlContext.createDataFrame(data,["features"])
 pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
 model = pca_extracted.fit(df)
 model.transform(df).collect()

 [Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
  Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
  Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]

然后,用我们的方法:

 comp, score, eigVals = pca(df)
 score.collect()

 [array([ 1.64857282,  4.0132827 ]),
  array([-4.64510433,  1.11679727]),
  array([-6.42888054,  5.33795143])]

我要强调一下,我们在我们定义的函数中使用任何collect()方法——score是一个RDD , 应该如此。

注意我们第二列的符号都是和现有方法推导出来的符号相反的;但这不是问题:根据(可免费下载)An Introduction to Statistical Learning,由 Hastie 和 Tibshirani 合着,p。 382

Each principal component loading vector is unique, up to a sign flip. This means that two different software packages will yield the same principal component loading vectors, although the signs of those loading vectors may differ. The signs may differ because each principal component loading vector specifies a direction in p-dimensional space: flipping the sign has no effect as the direction does not change. [...] Similarly, the score vectors are unique up to a sign flip, since the variance of Z is the same as the variance of −Z.

最后,既然我们有了可用的特征值,为解释的方差百分比编写一个函数就很简单了:

 def varianceExplained(df, k=1):
     """Calculate the fraction of variance explained by the top `k` eigenvectors.

     Args:
         df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
         k: The number of principal components to consider.

     Returns:
         float: A number between 0 and 1 representing the percentage of variance explained
             by the top `k` eigenvectors.
     """
     components, scores, eigenvalues = pca(df, k)  
     return sum(eigenvalues[0:k])/sum(eigenvalues)

 
 varianceExplained(df,1)
 # 0.79439325322305299

作为测试,我们还检查示例数据中解释的方差是否为 1.0,对于 k=5(因为原始数据是 5 维的):

 varianceExplained(df,5)
 # 1.0

[使用 Spark 1.5.0 和 1.5.1 开发和测试]

编辑:

PCASVD 最终都可以在 pyspark 开始 spark 2.2.0 according to this已解决 JIRA 票证 SPARK-6227.

原回答:

@desertnaut 给出的答案实际上从理论角度来看非常好,但我想介绍另一种方法来计算 SVD 并提取特征向量。

from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix

class SVD(JavaModelWrapper):
    """Wrapper around the SVD scala case class"""
    @property
    def U(self):
        """ Returns a RowMatrix whose columns are the left singular vectors of the SVD if computeU was set to be True."""
        u = self.call("U")
        if u is not None:
        return RowMatrix(u)

    @property
    def s(self):
        """Returns a DenseVector with singular values in descending order."""
        return self.call("s")

    @property
    def V(self):
        """ Returns a DenseMatrix whose columns are the right singular vectors of the SVD."""
        return self.call("V")

这定义了我们的 SVD 对象。我们现在可以使用 Java 包装器定义我们的 computeSVD 方法。

def computeSVD(row_matrix, k, computeU=False, rCond=1e-9):
    """
    Computes the singular value decomposition of the RowMatrix.
    The given row matrix A of dimension (m X n) is decomposed into U * s * V'T where
    * s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order.
    * U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A')
    * v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A' X A)
    :param k: number of singular values to keep. We might return less than k if there are numerically zero singular values.
    :param computeU: Whether of not to compute U. If set to be True, then U is computed by A * V * sigma^-1
    :param rCond: the reciprocal condition number. All singular values smaller than rCond * sigma(0) are treated as zero, where sigma(0) is the largest singular value.
    :returns: SVD object
    """
    java_model = row_matrix._java_matrix_wrapper.call("computeSVD", int(k), computeU, float(rCond))
    return SVD(java_model)

现在,让我们将其应用到示例中:

from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors

data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])

pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")

model = pca_extracted.fit(df)
features = model.transform(df) # this create a DataFrame with the regular features and pca_features

# We can now extract the pca_features to prepare our RowMatrix.
pca_features = features.select("pca_features").rdd.map(lambda row : row[0])
mat = RowMatrix(pca_features)

# Once the RowMatrix is ready we can compute our Singular Value Decomposition
svd = computeSVD(mat,2,True)
svd.s
# DenseVector([9.491, 4.6253])
svd.U.rows.collect()
# [DenseVector([0.1129, -0.909]), DenseVector([0.463, 0.4055]), DenseVector([0.8792, -0.0968])]
svd.V
# DenseMatrix(2, 2, [-0.8025, -0.5967, -0.5967, 0.8025], 0)

您问题的最简单答案是为您的模型输入单位矩阵。

identity_input = [(Vectors.dense([1.0, .0, 0.0, .0, 0.0]),),(Vectors.dense([.0, 1.0, .0, .0, .0]),), \
              (Vectors.dense([.0, 0.0, 1.0, .0, .0]),),(Vectors.dense([.0, 0.0, .0, 1.0, .0]),),
              (Vectors.dense([.0, 0.0, .0, .0, 1.0]),)]
df_identity = sqlContext.createDataFrame(identity_input,["features"])
identity_features = model.transform(df_identity)

这应该给你主要的组成部分。

我认为 eliasah 的答案在 Spark 框架方面更好,因为 desertnaut 通过使用 numpy 的函数而不是 Spark 的动作来解决问题。但是,eliasah 的回答缺少对数据进行规范化。所以,我会在 eliasah 的回答中添加以下几行:

from pyspark.ml.feature import StandardScaler
standardizer = StandardScaler(withMean=True, withStd=False,
                          inputCol='features',
                          outputCol='std_features')
model = standardizer.fit(df)
output = model.transform(df)
pca_features = output.select("std_features").rdd.map(lambda row : row[0])
mat = RowMatrix(pca_features)
svd = computeSVD(mat,5,True)

最终,svd.V 和 identity_features.select("pca_features").collect() 应该具有相同的值。

我在这篇 blog post 中总结了 PCA 及其在 Spark 和 sklearn 中的使用。

在 spark 2.2+ 中,您现在可以轻松获得解释方差:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=<columns of your original dataframe>, outputCol="features")
df = assembler.transform(<your original dataframe>).select("features")
from pyspark.ml.feature import PCA
pca = PCA(k=10, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
sum(model.explainedVariance)