PySpark 上的 PCA 执行不正常
PCA on PySpark irregular execution
我正在使用 PySpark 通过 csv 文件处理 PCA。我有一些奇怪的行为;我的代码有时工作得很好,但有时 returns 这个错误:
File "C:/spark/spark-2.1.0-bin-hadoop2.7/bin/pca_final2.py", line 25, in <module>
columns = (fileObj.first()).split(';')
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1361, in first
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1343, in take
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\context.py", line 965, in runJob
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.net.SocketException: Connection reset by peer: socket write error
这是我的代码:
#########################! importing libraries !########################
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.mllib.feature import Normalizer
import timeit
########################! main script !#################################
sc = SparkContext("local", "pca-app")
sqlContext = SQLContext(sc)
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
start=timeit.default_timer()
fileObj = sc.textFile('bigiris.csv')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')])
columns = (fileObj.first()).split(';')
df = spark.createDataFrame(data, columns)
df.show()
vecAssembler = VectorAssembler(inputCols=columns, outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pipeline = Pipeline(stages=[vecAssembler, pca])
model = pipeline.fit(df)
result = model.transform(df).select("pcaFeatures")
stop=timeit.default_timer()
result.show(truncate=False)
time=stop-start
print ("this operation takes ", (time), " seconds")
spark.stop()
为什么我的执行不正常?我应该添加什么来解决这个问题。
您在创建 data
框架时没有过滤掉 header。假设您的列名是字符串,这将导致错误,因为列名无法转换为浮点值。请参阅下面使用 filter
删除 header.
的脚本修改部分
fileObj = sc.textFile('e:/iris.data.txt')
header = fileObj.first()
data = fileObj.filter(lambda x: x != header).map(lambda line: [float(k) for k in line.split(';')])
columns = header.split(';')
df = spark.createDataFrame(data, columns)
df.show()
此处错误通知行 columns = (fileObj.first()).split(';')
。基本上,您是在 (;)
的基础上尝试 split
fileObj
的第一行。这里执行的操作顺序是错误的,因为行已经在上一步中转换为列表。
正确的操作顺序是这样的(columns line should be before data line
):-
fileObj = sc.textFile('bigiris.csv')
columns = (fileObj.first()).split(';')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')])
df = spark.createDataFrame(data, columns)
错误原因:-行(数据=)有fileObj.map 和 line.split(';')。已经根据 (;)
拆分了每一行 csv
如果您将 headers 作为 csv 中的文本并希望从数据中删除,请遵循 Jaco 的回答 filter(lambda x: x != header)
。
我正在使用 PySpark 通过 csv 文件处理 PCA。我有一些奇怪的行为;我的代码有时工作得很好,但有时 returns 这个错误:
File "C:/spark/spark-2.1.0-bin-hadoop2.7/bin/pca_final2.py", line 25, in <module>
columns = (fileObj.first()).split(';')
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1361, in first
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1343, in take
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\context.py", line 965, in runJob
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco
File "C:\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.net.SocketException: Connection reset by peer: socket write error
这是我的代码:
#########################! importing libraries !########################
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.mllib.feature import Normalizer
import timeit
########################! main script !#################################
sc = SparkContext("local", "pca-app")
sqlContext = SQLContext(sc)
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
start=timeit.default_timer()
fileObj = sc.textFile('bigiris.csv')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')])
columns = (fileObj.first()).split(';')
df = spark.createDataFrame(data, columns)
df.show()
vecAssembler = VectorAssembler(inputCols=columns, outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pipeline = Pipeline(stages=[vecAssembler, pca])
model = pipeline.fit(df)
result = model.transform(df).select("pcaFeatures")
stop=timeit.default_timer()
result.show(truncate=False)
time=stop-start
print ("this operation takes ", (time), " seconds")
spark.stop()
为什么我的执行不正常?我应该添加什么来解决这个问题。
您在创建 data
框架时没有过滤掉 header。假设您的列名是字符串,这将导致错误,因为列名无法转换为浮点值。请参阅下面使用 filter
删除 header.
fileObj = sc.textFile('e:/iris.data.txt')
header = fileObj.first()
data = fileObj.filter(lambda x: x != header).map(lambda line: [float(k) for k in line.split(';')])
columns = header.split(';')
df = spark.createDataFrame(data, columns)
df.show()
此处错误通知行 columns = (fileObj.first()).split(';')
。基本上,您是在 (;)
的基础上尝试 split
fileObj
的第一行。这里执行的操作顺序是错误的,因为行已经在上一步中转换为列表。
正确的操作顺序是这样的(columns line should be before data line
):-
fileObj = sc.textFile('bigiris.csv')
columns = (fileObj.first()).split(';')
data = fileObj.map(lambda line: [float(k) for k in line.split(';')])
df = spark.createDataFrame(data, columns)
错误原因:-行(数据=)有fileObj.map 和 line.split(';')。已经根据 (;)
如果您将 headers 作为 csv 中的文本并希望从数据中删除,请遵循 Jaco 的回答 filter(lambda x: x != header)
。