Scala/Spark:将数据框中的零膨胀数据转换为 libsvm
Scala/Spark: Converting zero inflated data in dataframe to libsvm
我是 scala 的新手(通常我在 R 中这样做)
我导入了一个零膨胀的大型数据框(2000 多列,100000 多行)。
任务
将数据转换为 libsvm 格式
步骤
据我了解步骤如下
- 确保特征列设置为 DoubleType 并且 Target 是 Int
- 遍历每一行,在一个数组中保留每个 >0 的值,在另一个数组中保留其列的索引
- 转换为 RDD[LabeledPoint]
- 以 libsvm 格式保存 RDD
我卡在了 3(但也许)因为我做错了第 2 步。
这是我的代码:
主要功能:
@Test
def testSpark(): Unit =
{
try
{
var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)
val indexer = new StringIndexer()
.setInputCol("Majors_Final")
.setOutputCol("Majors_Final_Indexed")
val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)
//only doubles accepted by sparse vector, so that's what we filter for
val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType)
val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
assertTrue(true)
}
catch
{
case ex: Exception =>
{
println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}")
fail()
}
}
}
将每一行转换为 LabeledPoint:
@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint =
{
try
{
val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq)
val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
val rowValuesItr: Iterable[Double] = sortedValuesMap.values
var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
var currentPosition: Int = 0
rowValuesItr.foreach
{
kv =>
if (kv > 0)
{
valuesArray += kv;
positionsArray += currentPosition;
}
currentPosition = currentPosition + 1;
}
val lp:LabeledPoint = new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray))
return lp
}
catch
{
case ex: Exception =>
{
throw new Exception(ex)
}
}
}
问题
因此,我尝试创建一个可以轻松转换为 RDD 的标记点数据框。
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
但是我得到以下错误:
SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri
alizing other types will be added in future releases.
[INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
好的,所以我跳过了 DataFrame 并创建了一个 LabeledPoints 数组,它很容易转换为 RDD。剩下的就简单了。
我强调,虽然这可行,但我是 scala 的新手,可能有更有效的方法来做到这一点。
主要功能如下:
val mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)
val indexer = new StringIndexer()
.setInputCol("Majors_Final")
.setOutputCol("Majors_Final_Indexed")
val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)
mDFFinal.show()
//only doubles accepted by sparse vector, so that's what we filter for
val fieldSeq: scala.collection.Seq[StructField] = mDFFinal.schema.fields.toSeq.filter(f => f.dataType == DoubleType)
val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()
mDFFinal.collect().foreach
{
row => positionsArray+=convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"));
}
val mRdd:RDD[LabeledPoint]= spark.sparkContext.parallelize(positionsArray.toSeq)
MLUtils.saveAsLibSVMFile(mRdd, "./output/libsvm")
我是 scala 的新手(通常我在 R 中这样做)
我导入了一个零膨胀的大型数据框(2000 多列,100000 多行)。
任务 将数据转换为 libsvm 格式
步骤 据我了解步骤如下
- 确保特征列设置为 DoubleType 并且 Target 是 Int
- 遍历每一行,在一个数组中保留每个 >0 的值,在另一个数组中保留其列的索引
- 转换为 RDD[LabeledPoint]
- 以 libsvm 格式保存 RDD
我卡在了 3(但也许)因为我做错了第 2 步。
这是我的代码:
主要功能:
@Test
def testSpark(): Unit =
{
try
{
var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)
val indexer = new StringIndexer()
.setInputCol("Majors_Final")
.setOutputCol("Majors_Final_Indexed")
val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)
//only doubles accepted by sparse vector, so that's what we filter for
val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType)
val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
assertTrue(true)
}
catch
{
case ex: Exception =>
{
println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}")
fail()
}
}
}
将每一行转换为 LabeledPoint:
@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint =
{
try
{
val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq)
val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
val rowValuesItr: Iterable[Double] = sortedValuesMap.values
var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
var currentPosition: Int = 0
rowValuesItr.foreach
{
kv =>
if (kv > 0)
{
valuesArray += kv;
positionsArray += currentPosition;
}
currentPosition = currentPosition + 1;
}
val lp:LabeledPoint = new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray))
return lp
}
catch
{
case ex: Exception =>
{
throw new Exception(ex)
}
}
}
问题 因此,我尝试创建一个可以轻松转换为 RDD 的标记点数据框。
val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
但是我得到以下错误:
SparkTest.scala:285: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for seri alizing other types will be added in future releases. [INFO] val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
好的,所以我跳过了 DataFrame 并创建了一个 LabeledPoints 数组,它很容易转换为 RDD。剩下的就简单了。
我强调,虽然这可行,但我是 scala 的新手,可能有更有效的方法来做到这一点。
主要功能如下:
val mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)
val indexer = new StringIndexer()
.setInputCol("Majors_Final")
.setOutputCol("Majors_Final_Indexed")
val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)
mDFFinal.show()
//only doubles accepted by sparse vector, so that's what we filter for
val fieldSeq: scala.collection.Seq[StructField] = mDFFinal.schema.fields.toSeq.filter(f => f.dataType == DoubleType)
val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()
mDFFinal.collect().foreach
{
row => positionsArray+=convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"));
}
val mRdd:RDD[LabeledPoint]= spark.sparkContext.parallelize(positionsArray.toSeq)
MLUtils.saveAsLibSVMFile(mRdd, "./output/libsvm")