包含免费 Monad 的文件 I/O
File I/O with Free Monads
我有一个 CSV 文件,需要对其进行解析并对每条记录执行一些操作。我如何使用 Free Monads 呢?目前,我正在将整个文件加载到内存中,想知道是否有更好的解决方案。下面是我的程序:
for {
reader <- F.getReader("my_file.csv")
csvRecords <- C.readCSV(reader)
_ <- I.processCSV(csvRecords)
_ <- F.close(reader)
} yield()
此代码适用于较小的文件,但如果我有非常大的文件(超过 1 GB),则效果不佳。我正在使用 Commons CSV 读取 CSVRecord。
下面是我如何使用 CSV 文件读取并对其进行一些操作的方式 -
我用 scala.io.Source.fromFile()
我创建了一个 case class
header
类型的 CSV 文件,以使数据更易于访问和操作。
PS:我对monad一无所知,而且我是Scala的初学者。我发布了这篇文章,因为它可能会有帮助。
case class AirportData(id:Int, ident:String, name:String, typeAirport:String, latitude_deg:Double,
longitude_deg:Double, elevation_ft:Double, continent:String, iso_country:String, iso_region:String,
municipality:String)
object AirportData extends App {
def toDoubleOrNeg(s: String): Double = {
try {
s.toDouble
} catch {
case _: NumberFormatException => -1
}
}
val source = scala.io.Source.fromFile("resources/airportData/airports.csv")
val lines = source.getLines().drop(1)
val data = lines.flatMap { line =>
val p = line.split(",")
Seq(AirportData(p(0).toInt, p(1).toString, p(2).toString, p(3).toString, toDoubleOrNeg(p(4)), toDoubleOrNeg(p(5)),
toDoubleOrNeg(p(6)), p(7).toString, p(8).toString, p(9).toString, p(10).toString))
}.toArray
source.close()
println(data.length)
data.take(10) foreach println
}
查看要点中的代码,我认为带有注释的行正是您根本不需要的行:
object CSVIOInterpreter extends (CSVIO ~> Future) {
import scala.collection.JavaConverters._
override def apply[A](fa: CSVIO[A]): Future[A] = fa match {
case ReadCSV(reader) => Future.fromTry(Try {
CSVFormat.RFC4180
.withFirstRecordAsHeader()
.parse(reader)
.getRecords // Loads the complete file
.iterator().asScala.toStream
})
}
}
只需删除整个 getRecords
行。 CSVFormat.parse
returns an instance of CSVParser 已经实现了 Iterable<CSVRecord>
。 getRecords
调用是唯一强制它读取整个文件的方法。
实际上你可以看到 CSVParser.getRecords 实现,它是
public List<CSVRecord> getRecords() throws IOException {
CSVRecord rec;
final List<CSVRecord> records = new ArrayList<>();
while ((rec = this.nextRecord()) != null) {
records.add(rec);
}
return records;
}
所以它只是使用 this.nextRecord
调用具体化整个文件,这显然是 API.
的更多 "core" 部分
因此,当我在不使用 getRecords
调用的情况下对您的代码进行简化后:
import cats._
import cats.free.Free
import java.io._
import org.apache.commons.csv._
import scala.collection.JavaConverters._
trait Action[A] {
def run(): A
}
object F {
import Free.liftF
case class GetReader(fileName: String) extends Action[Reader] {
override def run(): Reader = new FileReader(fileName)
}
case class CloseReader(reader: Reader) extends Action[Unit] {
override def run(): Unit = reader.close()
}
def getReader(fileName: String): Free[Action, Reader] = liftF(GetReader(fileName))
def close(reader: Reader): Free[Action, Unit] = liftF(CloseReader(reader))
}
object C {
import Free.liftF
case class ReadCSV(reader: Reader) extends Action[CSVParser] {
override def run(): CSVParser = CSVFormat.DEFAULT.parse(reader)
}
def readCSV(reader: Reader): Free[Action, CSVParser] = liftF(ReadCSV(reader))
}
object I {
import Free.liftF
case class ProcessCSV(parser: CSVParser) extends Action[Unit] {
override def run(): Unit = {
for (r <- parser.asScala)
println(r)
}
}
def processCSV(parser: CSVParser): Free[Action, Unit] = liftF(ProcessCSV(parser))
}
object Runner {
import cats.arrow.FunctionK
import cats.{Id, ~>}
val runner = new (Action ~> Id) {
def apply[A](fa: Action[A]): Id[A] = fa.run()
}
def run[A](free: Free[Action, A]): A = {
free.foldMap(runner)
}
}
def test() = {
val free = for {
// reader <- F.getReader("my_file.csv")
reader <- F.getReader("AssetsImportCompleteSample.csv")
csvRecords <- C.readCSV(reader)
_ <- I.processCSV(csvRecords)
_ <- F.close(reader)
} yield ()
Runner.run(free)
}
它似乎在逐行模式下工作正常。
我有一个 CSV 文件,需要对其进行解析并对每条记录执行一些操作。我如何使用 Free Monads 呢?目前,我正在将整个文件加载到内存中,想知道是否有更好的解决方案。下面是我的程序:
for {
reader <- F.getReader("my_file.csv")
csvRecords <- C.readCSV(reader)
_ <- I.processCSV(csvRecords)
_ <- F.close(reader)
} yield()
此代码适用于较小的文件,但如果我有非常大的文件(超过 1 GB),则效果不佳。我正在使用 Commons CSV 读取 CSVRecord。
下面是我如何使用 CSV 文件读取并对其进行一些操作的方式 -
我用 scala.io.Source.fromFile()
我创建了一个 case class
header
类型的 CSV 文件,以使数据更易于访问和操作。
PS:我对monad一无所知,而且我是Scala的初学者。我发布了这篇文章,因为它可能会有帮助。
case class AirportData(id:Int, ident:String, name:String, typeAirport:String, latitude_deg:Double,
longitude_deg:Double, elevation_ft:Double, continent:String, iso_country:String, iso_region:String,
municipality:String)
object AirportData extends App {
def toDoubleOrNeg(s: String): Double = {
try {
s.toDouble
} catch {
case _: NumberFormatException => -1
}
}
val source = scala.io.Source.fromFile("resources/airportData/airports.csv")
val lines = source.getLines().drop(1)
val data = lines.flatMap { line =>
val p = line.split(",")
Seq(AirportData(p(0).toInt, p(1).toString, p(2).toString, p(3).toString, toDoubleOrNeg(p(4)), toDoubleOrNeg(p(5)),
toDoubleOrNeg(p(6)), p(7).toString, p(8).toString, p(9).toString, p(10).toString))
}.toArray
source.close()
println(data.length)
data.take(10) foreach println
}
查看要点中的代码,我认为带有注释的行正是您根本不需要的行:
object CSVIOInterpreter extends (CSVIO ~> Future) {
import scala.collection.JavaConverters._
override def apply[A](fa: CSVIO[A]): Future[A] = fa match {
case ReadCSV(reader) => Future.fromTry(Try {
CSVFormat.RFC4180
.withFirstRecordAsHeader()
.parse(reader)
.getRecords // Loads the complete file
.iterator().asScala.toStream
})
}
}
只需删除整个 getRecords
行。 CSVFormat.parse
returns an instance of CSVParser 已经实现了 Iterable<CSVRecord>
。 getRecords
调用是唯一强制它读取整个文件的方法。
实际上你可以看到 CSVParser.getRecords 实现,它是
public List<CSVRecord> getRecords() throws IOException {
CSVRecord rec;
final List<CSVRecord> records = new ArrayList<>();
while ((rec = this.nextRecord()) != null) {
records.add(rec);
}
return records;
}
所以它只是使用 this.nextRecord
调用具体化整个文件,这显然是 API.
因此,当我在不使用 getRecords
调用的情况下对您的代码进行简化后:
import cats._
import cats.free.Free
import java.io._
import org.apache.commons.csv._
import scala.collection.JavaConverters._
trait Action[A] {
def run(): A
}
object F {
import Free.liftF
case class GetReader(fileName: String) extends Action[Reader] {
override def run(): Reader = new FileReader(fileName)
}
case class CloseReader(reader: Reader) extends Action[Unit] {
override def run(): Unit = reader.close()
}
def getReader(fileName: String): Free[Action, Reader] = liftF(GetReader(fileName))
def close(reader: Reader): Free[Action, Unit] = liftF(CloseReader(reader))
}
object C {
import Free.liftF
case class ReadCSV(reader: Reader) extends Action[CSVParser] {
override def run(): CSVParser = CSVFormat.DEFAULT.parse(reader)
}
def readCSV(reader: Reader): Free[Action, CSVParser] = liftF(ReadCSV(reader))
}
object I {
import Free.liftF
case class ProcessCSV(parser: CSVParser) extends Action[Unit] {
override def run(): Unit = {
for (r <- parser.asScala)
println(r)
}
}
def processCSV(parser: CSVParser): Free[Action, Unit] = liftF(ProcessCSV(parser))
}
object Runner {
import cats.arrow.FunctionK
import cats.{Id, ~>}
val runner = new (Action ~> Id) {
def apply[A](fa: Action[A]): Id[A] = fa.run()
}
def run[A](free: Free[Action, A]): A = {
free.foldMap(runner)
}
}
def test() = {
val free = for {
// reader <- F.getReader("my_file.csv")
reader <- F.getReader("AssetsImportCompleteSample.csv")
csvRecords <- C.readCSV(reader)
_ <- I.processCSV(csvRecords)
_ <- F.close(reader)
} yield ()
Runner.run(free)
}
它似乎在逐行模式下工作正常。