将 VectorAssembler 添加到 Spark ML 管道时出错

Error adding VectorAssembler to Spark ML Pipeline

尝试将 VectorAssembler 添加到 GBT 流水线示例并出现错误,流水线无法找到特征字段。我引入了示例文件而不是 libsvm,因此我需要转换功能集。

错误: 线程 "main" java.lang.IllegalArgumentException 中的异常:字段 "features" 不存在。

 val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("data/training_example.csv")

val sampleDF = df.sample(false,0.05,987897L)

val assembler = new VectorAssembler()
  .setInputCols(Array("val1","val2","val3",...,"valN"))
  .setOutputCol("features")

val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(sampleDF)

val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(sampleDF)

val Array(trainingData, testData) = sampleDF.randomSplit(Array(0.7, 0.3))

val gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(3)
  .setMaxDepth(5)

val pipeline = new Pipeline()
  .setStages(Array(assembler,labelIndexer,featureIndexer,gbt))

val model = pipeline.fit(trainingData)

val predictions = model.transform(testData)

predictions.show(10)

基本问题:

为什么要在 featureIndexer 中调用 fit()?

如果调用 fit(sampleDF),VectorIndexer 将在 sampleDF 中搜索特征列,但此数据集没有此类列。

Pipeline的fit()会调用所有的transformator和estimator,所以在assembler上调用fit,然后把结果传给labelIndexer的fit,把上一步的结果传给featureIndexer的fit。

将在管道内部调用的 featureIndexer.fit() 中使用的 DataFrame 将包含之前转换器生成的所有列。

在您的代码示例中,DF 没有特性列,但是,在 Pipeline fit() 过程中,该列将由汇编程序添加

文档示例从一开始就有功能列。

val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

您必须适合具有特征的 DF column.So 使用 VectorAssembler 转换您的原始 DF 并将其作为输入。