用于 PCA 数据处理的 Pyspark 通用模型
Pyspark Generic model for PCA data processing
我正在使用 PCA 进行数据分析,我用 PySpark 编写了这段代码,它工作得很好,但它只适用于从 csv 文件读取的数据,其中只有 5 列 ["a","b"、"c"、"d"、"e"],我想编写一个通用代码来计算从 csv 文件中读取的任何列数的 PCA。我应该添加什么?
这是我的代码:
#########################! importing libraries !########################
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.mllib.feature import Normalizer
import timeit
########################! main script !#################################
sc = SparkContext("local", "pca-app")
sqlContext = SQLContext(sc)
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
data = sc.textFile('dataset.csv') \
.map(lambda line: [float(k) for k in line.split(';')])\
.collect()
df = spark.createDataFrame(data, ["a","b","c","d","e"])
df.show()
vecAssembler = VectorAssembler(inputCols=["a","b","c","d","e"], outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pipeline = Pipeline(stages=[vecAssembler, pca]
model = pipeline.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False))
spark.stop()
您需要通过更改几行来使您的代码通用:-
fileObj = sc.textFile('dataset.csv')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')]).collect()
columns = (fileObj.first()).split()
df = spark.createDataFrame(data, columns)
df.show()
vecAssembler = VectorAssembler(inputCols=columns, outputCol="features")
我正在使用 PCA 进行数据分析,我用 PySpark 编写了这段代码,它工作得很好,但它只适用于从 csv 文件读取的数据,其中只有 5 列 ["a","b"、"c"、"d"、"e"],我想编写一个通用代码来计算从 csv 文件中读取的任何列数的 PCA。我应该添加什么? 这是我的代码:
#########################! importing libraries !########################
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.mllib.feature import Normalizer
import timeit
########################! main script !#################################
sc = SparkContext("local", "pca-app")
sqlContext = SQLContext(sc)
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
data = sc.textFile('dataset.csv') \
.map(lambda line: [float(k) for k in line.split(';')])\
.collect()
df = spark.createDataFrame(data, ["a","b","c","d","e"])
df.show()
vecAssembler = VectorAssembler(inputCols=["a","b","c","d","e"], outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pipeline = Pipeline(stages=[vecAssembler, pca]
model = pipeline.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False))
spark.stop()
您需要通过更改几行来使您的代码通用:-
fileObj = sc.textFile('dataset.csv')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')]).collect()
columns = (fileObj.first()).split()
df = spark.createDataFrame(data, columns)
df.show()
vecAssembler = VectorAssembler(inputCols=columns, outputCol="features")