如何使用模式解析带有 dataVec 的 CSV 文件?

how to parse a CSV file with dataVec using a schema?

我正在尝试使用 canova/datavec 加载 CSV 数据集,但找不到 "idiomatic" 的方法。我有点挣扎,因为我觉得框架在进化,这让我很难确定什么是相关的,什么不是。

object S extends App{
  val recordReader:RecordReader = new CSVRecordReader(0, ",")
  recordReader.initialize(new FileSplit(new File("./src/main/resources/CSVdataSet.csv")))
  val iter:DataSetIterator = new RecordReaderDataSetIterator(recordReader, 100)
  while(iter.hasNext){
    println(iter.next())
  }
}

我有一个以 header 描述开头的 csv 文件,因此我的输出是一个例外

(java.lang.NumberFormatException: For input string: "iid":)

我开始研究模式构建器,因为 schema/the header 我得到了一个异常。所以我想添加这样的架构;

val schema = new Schema.Builder()
    .addColumnInteger("iid")
    .build()   

从我的角度来看,noob-view、BasicDataVec-examples 并不完全清楚,因为它们 link 会产生火花等。来自 IrisAnalysisExample (https://github.com/deeplearning4j/dl4j-examples/blob/master/datavec-examples/src/main/java/org/datavec/transform/analysis/IrisAnalysis.java) . 我假设文件内容首先被读入 JavaRDD(可能是 Stream),然后再进行处理。除 DataAnalysis 外,不使用该架构。

所以,有人可以帮助我理解我是如何解析的(作为流或迭代器,第一行带有 header 描述的 CSV-file 吗?

我从他们的书(深入 learning:A 从业者方法)中了解到,数据转换(使用模式)需要 spark。因此,我将代码重写为;

object S extends App{
  val schema: Schema = new Schema.Builder()
    .addColumnInteger("iid")
    .build
  val recordReader = new CSVRecordReader(0, ",")
  val f = new File("./src/main/resources/CSVdataSet.csv")
  recordReader.initialize(new FileSplit(f))
  val sparkConf:SparkConf = new SparkConf()
  sparkConf.setMaster("local[*]");
  sparkConf.setAppName("DataVec Example");
  val sc:JavaSparkContext = new JavaSparkContext(sparkConf)
  val lines = sc.textFile(f.getAbsolutePath);
  val examples = lines.map(new StringToWritablesFunction(new CSVRecordReader()))
  val process = new TransformProcess.Builder(schema).build()
  val executor = new SparkTransformExecutor()
  val processed = executor.execute(examples, process)
  println(processed.first())
}

我现在认为模式会指示我只有 iid-column,但输出是:

[iid, id, gender, idg, .....]

回答我自己的问题可能被认为是不好的做法,但我会保留我的问题(现在回答)一段时间,看看它是否对其他人有用。

我了解如何在数据上使用模式,我可以在其中为所有功能创建相应的模式属性。我最初想处理每个向量中具有 200 多个特征值的数据集。必须为所有 200 个特性声明一个包含列属性的静态模式,这使得使用起来不切实际。但是,可能有一种更动态的创建模式的方法,我只是还没有找到。我决定在 Iris.csv 数据集上测试我的代码。这里的文件包含行属性;

Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species 

将作为模式实施的:

 val schema: Schema = new Schema.Builder()
    .addColumnInteger("Id")
    .addColumnDouble("SepalLengthCm")
    .addColumnDouble("SepalWidthCm")
    .addColumnDouble("PetalLengthCm")
    .addColumnDouble("PetalWidthCm")
    .addColumnString("Species")
    .build   

我觉得使用模式背后的动机之一是能够转换数据。因此,我想执行转换操作。 TransformProcess 定义了对我们的数据执行的一系列操作(使用 DataVec 附录 F 第 405 页深度学习:实践者方法)。

A TransformProcess is constructed by specifying two things:
   • The Schema of the initial input data
   • The set of operations we wish to execute Using DataVec

我决定看看是否可以从读取的数据中删除一列:

val process = new TransformProcess.Builder(schema)
  .removeColumns("Id")
  .build()

因此,我的代码变成了:

import org.datavec.api.records.reader.impl.csv.CSVRecordReader
import org.datavec.api.transform.{DataAction, TransformProcess}
import org.datavec.api.transform.schema.Schema
import java.io.File
import org.apache.spark.api.java.JavaSparkContext
import org.datavec.spark.transform.misc.StringToWritablesFunction
import org.apache.spark.SparkConf
import org.datavec.api.split.FileSplit
import org.datavec.spark.transform.SparkTransformExecutor

object S extends App{
   val schema: Schema = new Schema.Builder()
      .addColumnInteger("Id")
      .addColumnDouble("SepalLengthCm")
      .addColumnDouble("SepalWidthCm")
      .addColumnDouble("PetalLengthCm")
      .addColumnDouble("PetalWidthCm")
      .addColumnString("Species")
      .build

  val recordReader = new CSVRecordReader(0, ",")
  val f = new File("./src/main/resources/Iris.csv")
  recordReader.initialize(new FileSplit(f))
  println(recordReader.next())
  val sparkConf:SparkConf = new SparkConf()
  sparkConf.setMaster("local[*]");
  sparkConf.setAppName("DataVec Example");
  val sc:JavaSparkContext = new JavaSparkContext(sparkConf)
  val lines = sc.textFile(f.getAbsolutePath);
  val examples = lines.map(new StringToWritablesFunction(new CSVRecordReader()))
  val process = new TransformProcess.Builder(schema)
      .removeColumns("Id")
      .build()
  val executor = new SparkTransformExecutor()
  val processed = executor.execute(examples, process)
  println(processed.first())
}

第一个打印:

[Id, SepalLengthCm, SepalWidthCm, PetalLengthCm, PetalWidthCm, Species]

第二次印刷

[SepalLengthCm, SepalWidthCm, PetalLengthCm, PetalWidthCm, Species]

编辑:我发现我遇到了崩溃 "org.deeplearning4j" % "deeplearning4j-core" % "0.6.0" 作为我的 libraryDependency

虽然有一个旧的依赖它工作

"org.deeplearning4j" % "deeplearning4j-core" % "0.0.3.2.7"

libraryDependencies ++= Seq(
  "org.datavec" % "datavec-spark_2.11" % "0.5.0",
  "org.datavec" % "datavec-api" % "0.5.0",
  "org.deeplearning4j" % "deeplearning4j-core" % "0.0.3.2.7"
  //"org.deeplearning4j" % "deeplearning4j-core" % "0.6.0"
)