PySpark PCA:如何将数据帧行从多列转换为单列 DenseVector?
PySpark PCA: how to convert dataframe rows from multiple columns to a single column DenseVector?
我想使用 PySpark (Spark 1.6.2) 对 Hive table 中存在的数值数据执行主成分分析 (PCA)。我能够将 Hive table 导入 Spark 数据框:
>>> from pyspark.sql import HiveContext
>>> hiveContext = HiveContext(sc)
>>> dataframe = hiveContext.sql("SELECT * FROM my_table")
>>> type(dataframe)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> dataframe.columns
['par001', 'par002', 'par003', etc...]
>>> dataframe.collect()
[Row(par001=1.1, par002=5.5, par003=8.2, etc...), Row(par001=0.0, par002=5.7, par003=4.2, etc...), etc...]
有一个优秀的 Whosebug post 展示了如何在 PySpark 中执行 PCA:
在 post 的 'test' 部分,@desertnaut 创建了一个只有一列的数据框(称为 'features'):
>>> from pyspark.ml.feature import *
>>> from pyspark.mllib.linalg import Vectors
>>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
... (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
... (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = sqlContext.createDataFrame(data,["features"])
>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> df.columns
['features']
>>> df.collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0])), Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0])), Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]))]
@desertnaut 的示例数据框中的每一行都包含一个 DenseVector
对象,然后由 pca
函数使用。
Q) 如何将 Hive 中的数据帧转换为单列数据帧 ("features"),其中每行包含一个 DenseVector
表示原始行中的所有值?
你应该使用 VectorAssembler
。如果数据类似这样:
from pyspark.sql import Row
data = sc.parallelize([
Row(par001=1.1, par002=5.5, par003=8.2),
Row(par001=0.0, par002=5.7, par003=4.2)
]).toDF()
你应该导入所需的 class:
from pyspark.ml.feature import VectorAssembler
创建一个实例:
assembler = VectorAssembler(inputCols=data.columns, outputCol="features")
转换和select:
assembler.transform(data).select("features")
您还可以使用用户定义的函数。在 Spark 1.6 中,从 mllib
:
导入 Vectors
和 VectorUDT
from pyspark.mllib.linalg import Vectors, VectorUDT
和 udf
来自 sql.functions
:
from pyspark.sql.functions import udf, array
和select:
data.select(
udf(Vectors.dense, VectorUDT())(*data.columns)
).toDF("features")
这不那么冗长,但速度要慢得多。
我想使用 PySpark (Spark 1.6.2) 对 Hive table 中存在的数值数据执行主成分分析 (PCA)。我能够将 Hive table 导入 Spark 数据框:
>>> from pyspark.sql import HiveContext
>>> hiveContext = HiveContext(sc)
>>> dataframe = hiveContext.sql("SELECT * FROM my_table")
>>> type(dataframe)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> dataframe.columns
['par001', 'par002', 'par003', etc...]
>>> dataframe.collect()
[Row(par001=1.1, par002=5.5, par003=8.2, etc...), Row(par001=0.0, par002=5.7, par003=4.2, etc...), etc...]
有一个优秀的 Whosebug post 展示了如何在 PySpark 中执行 PCA:
在 post 的 'test' 部分,@desertnaut 创建了一个只有一列的数据框(称为 'features'):
>>> from pyspark.ml.feature import *
>>> from pyspark.mllib.linalg import Vectors
>>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
... (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
... (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = sqlContext.createDataFrame(data,["features"])
>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> df.columns
['features']
>>> df.collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0])), Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0])), Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]))]
@desertnaut 的示例数据框中的每一行都包含一个 DenseVector
对象,然后由 pca
函数使用。
Q) 如何将 Hive 中的数据帧转换为单列数据帧 ("features"),其中每行包含一个 DenseVector
表示原始行中的所有值?
你应该使用 VectorAssembler
。如果数据类似这样:
from pyspark.sql import Row
data = sc.parallelize([
Row(par001=1.1, par002=5.5, par003=8.2),
Row(par001=0.0, par002=5.7, par003=4.2)
]).toDF()
你应该导入所需的 class:
from pyspark.ml.feature import VectorAssembler
创建一个实例:
assembler = VectorAssembler(inputCols=data.columns, outputCol="features")
转换和select:
assembler.transform(data).select("features")
您还可以使用用户定义的函数。在 Spark 1.6 中,从 mllib
:
Vectors
和 VectorUDT
from pyspark.mllib.linalg import Vectors, VectorUDT
和 udf
来自 sql.functions
:
from pyspark.sql.functions import udf, array
和select:
data.select(
udf(Vectors.dense, VectorUDT())(*data.columns)
).toDF("features")
这不那么冗长,但速度要慢得多。