如何并行拟合 Spark 的分类器?
How to fit Spark's classifier in parallel?
伙计们,我有一个奇怪的问题...
我正在尝试像这样训练多类 SVM 分类器:
JavaPairRDD<Tuple2<String, String>, SVMModel> jp = scmap.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, RDD<LabeledPoint>>,Tuple2<String, String>, SVMModel >(){
@Override
public Tuple2<Tuple2<String, String>, SVMModel> call(Tuple2<Tuple2<String, String>, RDD<LabeledPoint>> tup)
{
SVMWithSGD svmAlg = new SVMWithSGD();
svmAlg.optimizer()
.setNumIterations(100)
.setRegParam(0.1)
.setUpdater(new SquaredL2Updater());
final SVMModel model = svmAlg.run(tup._2());
model.clearThreshold();
return new Tuple2<Tuple2<String, String>, SVMModel>(tup._1(), model);
}
});
但是当我尝试 collect() jp - 我有这个错误:
15/01/16 20:06:30 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 5.0 (TID 147, fujitsu11.in.nu): java.lang.NullPointerException:
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:157)
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:204)
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:202)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
org.apache.spark.rdd.RDD.take(RDD.scala:1060)
org.apache.spark.rdd.RDD.first(RDD.scala:1092)
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:141)
maven.maven1.App.call(App.java:430)
maven.maven1.App.call(App.java:1)
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:926)
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:926)
scala.collection.Iterator$$anon.next(Iterator.scala:328)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:774)
org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:774)
org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1121)
org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1121)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Starting task 7.1 in stage 5.0 (TID 148, fujitsu11.in.nu, PROCESS_LOCAL, 2219 bytes)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Lost task 7.1 in stage 5.0 (TID 148) on executor fujitsu11.in.nu: java.lang.NullPointerException (null) [duplicate 1]
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Starting task 7.2 in stage 5.0 (TID 149, fujitsu11.in.nu, PROCESS_LOCAL, 2219 bytes)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Lost task 7.2 in stage 5.0 (TID 149) on executor fujitsu11.in.nu: java.lang.NullPointerException (null) [duplicate 2]
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Starting task 7.3 in stage 5.0 (TID 150, fujitsu11.in.nu, PROCESS_LOCAL, 2219 bytes)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Lost task 7.3 in stage 5.0 (TID 150) on executor fujitsu11.in.nu: java.lang.NullPointerException (null) [duplicate 3]
15/01/16 20:06:30 ERROR scheduler.TaskSetManager: Task 7 in stage 5.0 failed 4 times; aborting job
15/01/16 20:06:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool
15/01/16 20:06:30 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
15/01/16 20:06:30 INFO scheduler.DAGScheduler: Failed to run collectAsMap at App.java:452
为什么我会得到NullPointer?我检查了几次,我的
RDD<LabeledPoint>
和
Tuple2<String, String>
不为空。也许它不能在工人身上并行训练分类器?
谢谢。
您不能运行分布式操作内部的分布式操作。但是您的第一个 mapToPair
不必是分布式操作。只是 .par.map
驱动程序本地的一个集合,每个集合都会生成一个分布式操作以适应模型。
伙计们,我有一个奇怪的问题...
我正在尝试像这样训练多类 SVM 分类器:
JavaPairRDD<Tuple2<String, String>, SVMModel> jp = scmap.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, RDD<LabeledPoint>>,Tuple2<String, String>, SVMModel >(){
@Override
public Tuple2<Tuple2<String, String>, SVMModel> call(Tuple2<Tuple2<String, String>, RDD<LabeledPoint>> tup)
{
SVMWithSGD svmAlg = new SVMWithSGD();
svmAlg.optimizer()
.setNumIterations(100)
.setRegParam(0.1)
.setUpdater(new SquaredL2Updater());
final SVMModel model = svmAlg.run(tup._2());
model.clearThreshold();
return new Tuple2<Tuple2<String, String>, SVMModel>(tup._1(), model);
}
});
但是当我尝试 collect() jp - 我有这个错误:
15/01/16 20:06:30 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 5.0 (TID 147, fujitsu11.in.nu): java.lang.NullPointerException:
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:157)
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:204)
org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:202)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
org.apache.spark.rdd.RDD.take(RDD.scala:1060)
org.apache.spark.rdd.RDD.first(RDD.scala:1092)
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:141)
maven.maven1.App.call(App.java:430)
maven.maven1.App.call(App.java:1)
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:926)
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:926)
scala.collection.Iterator$$anon.next(Iterator.scala:328)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:774)
org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:774)
org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1121)
org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1121)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Starting task 7.1 in stage 5.0 (TID 148, fujitsu11.in.nu, PROCESS_LOCAL, 2219 bytes)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Lost task 7.1 in stage 5.0 (TID 148) on executor fujitsu11.in.nu: java.lang.NullPointerException (null) [duplicate 1]
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Starting task 7.2 in stage 5.0 (TID 149, fujitsu11.in.nu, PROCESS_LOCAL, 2219 bytes)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Lost task 7.2 in stage 5.0 (TID 149) on executor fujitsu11.in.nu: java.lang.NullPointerException (null) [duplicate 2]
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Starting task 7.3 in stage 5.0 (TID 150, fujitsu11.in.nu, PROCESS_LOCAL, 2219 bytes)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Lost task 7.3 in stage 5.0 (TID 150) on executor fujitsu11.in.nu: java.lang.NullPointerException (null) [duplicate 3]
15/01/16 20:06:30 ERROR scheduler.TaskSetManager: Task 7 in stage 5.0 failed 4 times; aborting job
15/01/16 20:06:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool
15/01/16 20:06:30 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
15/01/16 20:06:30 INFO scheduler.DAGScheduler: Failed to run collectAsMap at App.java:452
为什么我会得到NullPointer?我检查了几次,我的
RDD<LabeledPoint>
和
Tuple2<String, String>
不为空。也许它不能在工人身上并行训练分类器?
谢谢。
您不能运行分布式操作内部的分布式操作。但是您的第一个 mapToPair
不必是分布式操作。只是 .par.map
驱动程序本地的一个集合,每个集合都会生成一个分布式操作以适应模型。