使用 PySpark 的 Spark 2.3.0 示例中的 PCA

PCA in Spark 2.3.0 Examples with PySpark

我有一个 Spark 数据框,我想将其用于 运行 一个简单的 PCA 示例。我看过 this example 并注意到它有效,因为它们将特征转置为向量:

from pyspark.ml.linalg import Vectors
>>> data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
...     (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
...     (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = spark.createDataFrame(data,["features"])
>>> pca = PCA(k=2, inputCol="features", outputCol="pca_features")

我正在尝试使用我自己创建的 Spark Dataframe 来重现相同类型的简单 PCA。我如何将我的 Spark DataFrame 转换为与上述类似的形式,以便我可以 运行 它具有一个输入列和一个输出列?

我研究过使用 RowMatrix as shown here,但我不明白这是否可行(请参阅下面的错误)。

>>>from pyspark.mllib.linalg import Vectors
>>>from pyspark.mllib.linalg.distributed import RowMatrix
>>>from pyspark.ml.feature import PCA
>>>master = pd.read_parquet('master.parquet',engine='fastparquet')
>>>A = sc.parallelize(master)
>>>mat = RowMatrix(A)
>>>pc = mat.computePrincipalComponents(4)

Py4JJavaError: An error occurred while calling o382.computePrincipalComponents. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last)

在用于 mllib 库的 Pyspark 中,您需要将所有特征转换为单个特征向量。 您可以使用 Vector Assembler 执行相同的操作: https://spark.apache.org/docs/latest/ml-features.html#vectorindexer

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=inputColumnsList,outputCol='features')
assembler.transform(df)

其中 inputColsList 包含您要使用的所有功能的列表