使用loadlabledpoints RDD时出现pyspark错误
pyspark error while using loadlabledpoints RDD
我正在使用 pyspark
我读取了一个libsvm文件,转置它,然后再次保存。
我将每个数据行保存为 MLUtils.labeledpoint 具有稀疏数据的对象
我尝试使用 MLUtils.saveaslibsvm 而不是使用 MLUtils.loadlibsvm 读取文件,但出现以下错误
ValueError: could not convert string to float: [
at
org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:193)
at
org.apache.spark.api.python.PythonRunner$$anon.(PythonRDD.scala:234)
at
org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:336) at
org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:334) at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1055)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1029)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
我在 MLUtils 页面上读到,如果你想使用 loadlabeledpoints,你需要使用 RDD.saveAsTextFile 保存数据,但是当我这样做时,我得到
17/08/10 16:55:51 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID
3, 192.168.1.205, executor 0): org.apache.spark.SparkException: Cannot
parse a double from: [ at
org.apache.spark.mllib.util.NumericParser$.parseDouble(NumericParser.scala:120)
at
org.apache.spark.mllib.util.NumericParser$.parseArray(NumericParser.scala:70)
at
org.apache.spark.mllib.util.NumericParser$.parseTuple(NumericParser.scala:91)
at
org.apache.spark.mllib.util.NumericParser$.parse(NumericParser.scala:41)
at
org.apache.spark.mllib.regression.LabeledPoint$.parse(LabeledPoint.scala:62)
at
org.apache.spark.mllib.util.MLUtils$$anonfun$loadLabeledPoints.apply(MLUtils.scala:195)
at
org.apache.spark.mllib.util.MLUtils$$anonfun$loadLabeledPoints.apply(MLUtils.scala:195)
at scala.collection.Iterator$$anon.next(Iterator.scala:409) at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
at
org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:936)
at
org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:936)
at
org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2062)
at
org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.NumberFormatException: For input string: "[" at
sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at
java.lang.Double.parseDouble(Double.java:538) at
org.apache.spark.mllib.util.NumericParser$.parseDouble(NumericParser.scala:117)
... 30 more
如何将标记点的 RDD 保存为 libsvm 格式,然后使用 pyspark 从磁盘加载它?
谢谢
问题是将LabledPoints写入文件没有使用libsvm格式,然后很难重新读取它。
我通过在内存中创建标记点来解决它,然后在将其写入文件之前,我将其转换为 libsvm 格式字符串,然后将其写入文本,之后,我能够以 libsvm 格式读取它
def pointToLibsvmRow(point):
s = point.features.reshape(2,-1, order="C").transpose().astype("str")
pairs = [str(int(float(point.label)))] + ["%s:%s" % (str(int(float(a))), b) for a, b in s.tolist()]
st = " ".join(pairs)
return st
我正在使用 pyspark
我读取了一个libsvm文件,转置它,然后再次保存。
我将每个数据行保存为 MLUtils.labeledpoint 具有稀疏数据的对象
我尝试使用 MLUtils.saveaslibsvm 而不是使用 MLUtils.loadlibsvm 读取文件,但出现以下错误
ValueError: could not convert string to float: [
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon.(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:336) at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:334) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1055) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:1029) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
我在 MLUtils 页面上读到,如果你想使用 loadlabeledpoints,你需要使用 RDD.saveAsTextFile 保存数据,但是当我这样做时,我得到
17/08/10 16:55:51 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3, 192.168.1.205, executor 0): org.apache.spark.SparkException: Cannot parse a double from: [ at org.apache.spark.mllib.util.NumericParser$.parseDouble(NumericParser.scala:120) at org.apache.spark.mllib.util.NumericParser$.parseArray(NumericParser.scala:70) at org.apache.spark.mllib.util.NumericParser$.parseTuple(NumericParser.scala:91) at org.apache.spark.mllib.util.NumericParser$.parse(NumericParser.scala:41) at org.apache.spark.mllib.regression.LabeledPoint$.parse(LabeledPoint.scala:62) at org.apache.spark.mllib.util.MLUtils$$anonfun$loadLabeledPoints.apply(MLUtils.scala:195) at org.apache.spark.mllib.util.MLUtils$$anonfun$loadLabeledPoints.apply(MLUtils.scala:195) at scala.collection.Iterator$$anon.next(Iterator.scala:409) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112) at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:936) at org.apache.spark.rdd.RDD$$anonfun$collect$$anonfun.apply(RDD.scala:936) at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NumberFormatException: For input string: "[" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at org.apache.spark.mllib.util.NumericParser$.parseDouble(NumericParser.scala:117) ... 30 more
如何将标记点的 RDD 保存为 libsvm 格式,然后使用 pyspark 从磁盘加载它?
谢谢
问题是将LabledPoints写入文件没有使用libsvm格式,然后很难重新读取它。
我通过在内存中创建标记点来解决它,然后在将其写入文件之前,我将其转换为 libsvm 格式字符串,然后将其写入文本,之后,我能够以 libsvm 格式读取它
def pointToLibsvmRow(point):
s = point.features.reshape(2,-1, order="C").transpose().astype("str")
pairs = [str(int(float(point.label)))] + ["%s:%s" % (str(int(float(a))), b) for a, b in s.tolist()]
st = " ".join(pairs)
return st