在 Apache Spark 中为具有大量列的数据集创建 ml 管道的最佳方法

Optimal way to create a ml pipeline in Apache Spark for dataset with high number of columns

我正在使用 Spark 2.1.1 处理具有约 2000 个特征的数据集,并尝试创建一个基本的 ML 管道,其中包含一些转换器和一个分类器。

为了简单起见,我们假设我正在使用的管道由一个 VectorAssembler、StringIndexer 和一个分类器组成,这将是一个相当常见的用例。

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

如果流水线步骤被分成转换器流水线(VectorAssembler + StringIndexer)和第二个分类器流水线,并且如果不必要的列被丢弃在两个流水线之间,则训练成功。 这意味着要重用模型,必须在训练后保存两个 PipelineModel,并且必须引入中间预处理步骤。

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

(恕我直言)更简洁的解决方案是将所有管道阶段合并到一个管道中。

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

但是,将所有 PipelineStages 放入一个 Pipeline 会导致以下异常,可能是由于 this PR 最终会解决的问题:

ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF

原因是 VectorAssembler 有效地使 DataFrame 中的数据量加倍(在此示例中),因为没有转换器可以删除不必要的列。 (参见

该示例适用于 golub dataset,需要进行以下预处理步骤:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

由于我是 Spark 的新手,我不确定解决此问题的最佳方法是什么。你会建议...

  1. 创建一个新的转换器,它会删除列并且可以合并到管道中?
  2. 拆分两个管道并引入中间步骤
  3. 还有什么吗? :)

或者我是否遗漏了任何可以解决此问题的重要内容(管道步骤、PR 等)?


编辑:

我实现了一个新的 Transformer DroppingVectorAssembler,它删除了不必要的列,但是,抛出了同样的异常。

除此之外,将 spark.sql.codegen.wholeStage 设置为 false 并不能解决问题。

您遇到的 janino 错误是因为根据功能集,生成的代码会变大。

我会将这些步骤分成不同的管道并删除不必要的功能,保存 StringIndexerOneHotEncoder 等中间模型并在预测阶段加载它们,这也很有用,因为转换会对于必须预测的数据更快。

最后,您不需要在 运行 VectorAssembler 阶段之后保留特征列,因为它将特征转换为 feature vectorlabel 列,并且这就是 运行 预测所需的全部内容。

Example of Pipeline in Scala with saving of intermediate steps-(Older spark API)

此外,如果您使用的是 1.6.0 等旧版本的 spark,则需要检查补丁版本,即 2.1.1 或 2.2.0 或 1.6.4,否则您会点击 Janino即使有大约 400 个特征列也会出错。

janino 错误是由于在优化程序过程中创建的常量变量的数量。 JVM 中允许的常量变量的最大限制是 ((2^16) -1)。如果超过这个限制,那么你会得到 Constant pool for class ... has grown past JVM limit of 0xFFFF

将解决此问题的 JIRA 是 SPARK-18016,但目前仍在进行中。

您的代码很可能在 VectorAssembler 阶段失败,此时它必须在单个优化任务中针对数千列执行。

我为这个问题开发的解决方法是创建一个 "vector of vectors",方法是针对列的子集进行处理,然后在最后将结果放在一起以创建一个单一的特征向量。这可以防止任何单个优化任务超过 JVM 常量限制。它并不优雅,但我已经在达到 10k 列范围的数据集上使用了它。

此方法还允许您仍然保留单个管道,但它需要一些额外的步骤才能使其工作(创建子向量)。从子向量创建特征向量后,您可以根据需要删除原始源列。

示例代码:

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}

// Create first example dataframe
val exampleDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
        "col6", "col7", "col8", "col9", "colA", "colB", 
        "colC", "colD", "colE", "colF", "colG", "colH", 
        "colI", "colJ", "colK")

// Create multiple column lists using the sliding method
val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray

// Create a vector assembler for each column list
val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")

// Create a vector assembler using column list vectors as input
val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")

// Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))

// Fit and transform the data
val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)

// Get the number of features in "features" vector
val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))

// Print number of features in "features vector"
print(featureLength)

(注意:创建列列表的方法实际上应该以编程方式完成,但为了理解概念,我让这个示例保持简单。)