运行 3000 多个随机森林模型,按组使用 Spark MLlib Scala API

Run 3000+ Random Forest Models By Group Using Spark MLlib Scala API

我正在尝试使用 Spark Scala API 在大型模型输入 csv 文件上按组(School_ID,超过 3000 个)构建随机森林模型。每个组包含大约 3000-4000 条记录。我拥有的资源是 20-30 个 aws m3.2xlarge 实例。

在 R 中,我可以按组构建模型并将它们保存到这样的列表中 -

library(dplyr);library(randomForest);
    Rf_model <- train %>% group_by(School_ID) %>% 
                do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))

列表可以存储在某个地方,当我需要使用它们时我可以调用它们,如下所示 -

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <-  predict(Rf_model.school$school[school_index][[1]], newdata=test)

我想知道如何在 Spark 中执行此操作,我是否需要先按组拆分数据,以及如何在必要时高效地执行此操作。

我能够根据以下代码按 School_ID 拆分文件,但它似乎为每次迭代创建了一个单独的作业子集,并且需要很长时间才能完成这些作业。有没有办法一次性搞定?

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))

for( i <- 0 to programs.length - 1 ){
  bySchoolArray(i).
    write.format("com.databricks.spark.csv").
    option("header", "true").
    save("model_input_bySchool/model_input_"+ schools(i))
}

来源: How can I split a dataframe into dataframes with same column values in SCALA and SPARK

编辑 2015 年 8 月 24 日 我正在尝试将我的数据帧转换为随机森林模型接受的格式。我正在按照此线程上的说明进行操作

基本上,我创建了一个新变量 "label" 并将我的 class 存储在 Double 中。然后我使用 VectorAssembler 函数组合我的所有特征并按如下方式转换我的输入数据-

val assembler = new VectorAssembler().
  setInputCols(Array("COL1", "COL2", "COL3")).
  setOutputCol("features")

val model_input = assembler.transform(model_input_raw).
  select("SCHOOL_ID", "label", "features")

部分错误消息(如果您需要完整的日志消息,请告诉我)-

scala.MatchError: StringType (of class org.apache.spark.sql.types.StringType$) at org.apache.spark.ml.feature.VectorAssembler$$anonfun.apply(VectorAssembler.scala:57)

将所有变量转换为数字类型后解决。

编辑 2015 年 8 月 25 日 ml 模型不接受我手动编码的标签,因此我需要使用 StringIndexer 来解决问题 . According to the official documentation,最常见的标签为 0。它会导致 School_ID 中的标签不一致。我想知道是否有一种方法可以在不重置值顺序的情况下创建标签。

val indexer = new StringIndexer().
  setInputCol("label_orig").
  setOutputCol("label")

任何建议或指示都会有所帮助,请随时提出任何问题。谢谢!

由于您已经为每所学校创建了单独的数据框,因此这里无需做太多工作。既然你是数据框,我假设你想使用 ml.classification.RandomForestClassifier。如果是这样,你可以尝试这样的事情:

  1. 提取管道逻辑。根据您的要求调整 RandomForestClassifier 参数和转换器

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.ml.classification.RandomForestClassifier
    import org.apache.spark.ml.{Pipeline, PipelineModel}
    
    def trainModel(df: DataFrame): PipelineModel = {
       val rf  = new RandomForestClassifier()
       val pipeline = new Pipeline().setStages(Array(rf))
       pipeline.fit(df)
    }
    
  2. 在每个子集上训练模型

    val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))
    
  3. 保存模型

    import java.io._
    
    def saveModel(name: String, model: PipelineModel) = {
      val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name"))
      oos.writeObject(model)
      oos.close
    }
    
    schools.zip(bySchoolArrayModels).foreach{
      case (name, model) => saveModel(name, Model)
    }
    
  4. 可选:由于单个子集相当小,您可以尝试一种类似于我描述的方法 来提交多个任务同时

  5. 如果用mllib.tree.model.RandomForestModel可以省略3.直接用model.save。由于反序列化似乎存在一些问题(How to deserialize Pipeline model in spark.ml? - 据我所知它工作得很好但比抱歉更安全,我想)它可能是一种首选方法。

编辑

根据the official documentation

VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type.

由于错误指示您的列是 String,您应该首先对其进行转换,例如使用 StringIndexer.