Scala/Spark:将数据框中的零膨胀数据转换为 libsvm

Scala/Spark: Converting zero inflated data in dataframe to libsvm

我是 scala 的新手(通常我在 R 中这样做)

我导入了一个零膨胀的大型数据框(2000 多列,100000 多行)。

任务 将数据转换为 libsvm 格式

步骤 据我了解步骤如下

  1. 确保特征列设置为 DoubleType 并且 Target 是 Int
  2. 遍历每一行,在一个数组中保留每个 >0 的值,在另一个数组中保留其列的索引
  3. 转换为 RDD[LabeledPoint]
  4. 以 libsvm 格式保存 RDD

我卡在了 3(但也许)因为我做错了第 2 步。

这是我的代码:

主要功能:

@Test
def testSpark(): Unit =
{
try
{

  var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")


  val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)

  val indexer = new StringIndexer()
    .setInputCol("Majors_Final")
    .setOutputCol("Majors_Final_Indexed")
  val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
  val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)



  //only doubles accepted by sparse vector, so that's what we filter for
  val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType)

  val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)


  val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()


  assertTrue(true)
}
catch
{
  case ex: Exception =>
  {

    println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}")
    fail()
  }
  }
}

将每一行转换为 LabeledPoint:

 @throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint =
{
  try
  {
    val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq)

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
    val rowValuesItr: Iterable[Double] = sortedValuesMap.values

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
    var currentPosition: Int = 0
    rowValuesItr.foreach
    {
      kv =>
        if (kv > 0)
        {
          valuesArray += kv;
          positionsArray += currentPosition;
        }
        currentPosition = currentPosition + 1;
    }

    val lp:LabeledPoint = new LabeledPoint(label,  org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray))

    return lp

  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}

问题 因此,我尝试创建一个可以轻松转换为 RDD 的标记点数据框。

val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()

但是我得到以下错误:

SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()

好的,所以我跳过了 DataFrame 并创建了一个 LabeledPoints 数组,它很容易转换为 RDD。剩下的就简单了。

我强调,虽然这可行,但我是 scala 的新手,可能有更有效的方法来做到这一点。

主要功能如下:

  val mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
  val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)

  val indexer = new StringIndexer()
    .setInputCol("Majors_Final")
    .setOutputCol("Majors_Final_Indexed")
  val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
  val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)

  mDFFinal.show()
  //only doubles accepted by sparse vector, so that's what we filter for
  val fieldSeq: scala.collection.Seq[StructField] = mDFFinal.schema.fields.toSeq.filter(f => f.dataType == DoubleType)
  val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)

  var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()

  mDFFinal.collect().foreach
  {

    row => positionsArray+=convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"));

  }

  val mRdd:RDD[LabeledPoint]= spark.sparkContext.parallelize(positionsArray.toSeq)

  MLUtils.saveAsLibSVMFile(mRdd, "./output/libsvm")