如何并行拟合 Spark 的分类器?

How to fit Spark's classifier in parallel?

伙计们,我有一个奇怪的问题...

我正在尝试像这样训练多类 SVM 分类器:

JavaPairRDD<Tuple2<String, String>, SVMModel> jp = scmap.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, RDD<LabeledPoint>>,Tuple2<String, String>, SVMModel >(){
       @Override
       public Tuple2<Tuple2<String, String>, SVMModel> call(Tuple2<Tuple2<String, String>, RDD<LabeledPoint>> tup) 
       {
               SVMWithSGD svmAlg = new SVMWithSGD();
               svmAlg.optimizer()
               .setNumIterations(100)
               .setRegParam(0.1)
               .setUpdater(new SquaredL2Updater());
               final SVMModel model = svmAlg.run(tup._2());
               model.clearThreshold();
               return new Tuple2<Tuple2<String, String>, SVMModel>(tup._1(), model);

       }
   });

但是当我尝试 collect() jp - 我有这个错误:

15/01/16 20:06:30 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 5.0 (TID 147, fujitsu11.in.nu): java.lang.NullPointerException: 
        org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:157)
    org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
    org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:204)
    org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:202)
    scala.Option.getOrElse(Option.scala:120)
    org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    org.apache.spark.rdd.RDD.take(RDD.scala:1060)
    org.apache.spark.rdd.RDD.first(RDD.scala:1092)
    org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:141)
    maven.maven1.App.call(App.java:430)
    maven.maven1.App.call(App.java:1)
    org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:926)
    org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun.apply(JavaPairRDD.scala:926)
    scala.collection.Iterator$$anon.next(Iterator.scala:328)
    scala.collection.Iterator$class.foreach(Iterator.scala:727)
    scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    scala.collection.AbstractIterator.to(Iterator.scala:1157)
    scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:774)
    org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:774)
    org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1121)
    org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1121)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    org.apache.spark.scheduler.Task.run(Task.scala:54)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Starting task 7.1 in stage 5.0 (TID 148, fujitsu11.in.nu, PROCESS_LOCAL, 2219 bytes)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Lost task 7.1 in stage 5.0 (TID 148) on executor fujitsu11.in.nu: java.lang.NullPointerException (null) [duplicate 1]
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Starting task 7.2 in stage 5.0 (TID 149, fujitsu11.in.nu, PROCESS_LOCAL, 2219 bytes)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Lost task 7.2 in stage 5.0 (TID 149) on executor fujitsu11.in.nu: java.lang.NullPointerException (null) [duplicate 2]
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Starting task 7.3 in stage 5.0 (TID 150, fujitsu11.in.nu, PROCESS_LOCAL, 2219 bytes)
15/01/16 20:06:30 INFO scheduler.TaskSetManager: Lost task 7.3 in stage 5.0 (TID 150) on executor fujitsu11.in.nu: java.lang.NullPointerException (null) [duplicate 3]
15/01/16 20:06:30 ERROR scheduler.TaskSetManager: Task 7 in stage 5.0 failed 4 times; aborting job
15/01/16 20:06:30 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 
15/01/16 20:06:30 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
15/01/16 20:06:30 INFO scheduler.DAGScheduler: Failed to run collectAsMap at App.java:452

为什么我会得到NullPointer?我检查了几次,我的

RDD<LabeledPoint>

Tuple2<String, String> 

不为空。也许它不能在工人身上并行训练分类器?

谢谢。

您不能运行分布式操作内部的分布式操作。但是您的第一个 mapToPair 不必是分布式操作。只是 .par.map 驱动程序本地的一个集合,每个集合都会生成一个分布式操作以适应模型。