如何使用 Play Iteratees 为过程的每个步骤逐块读取和处理文件
How to read and process a file chunk by chunk for each step of the process using Play Iteratees
我正在使用播放框架 Iteratee 来读取文件。我想逐块处理这个文件(每一步)。
我编写了以下步骤:
groupByLines: Enumeratee[Array[Byte], List[String]]
turnIntoLines: Enumeratee[List[String], List[Line]]
(我定义了case class Line(number: Int, value: String)
)
parseChunk: Iteratee[List[Line], Try[List[T]]]
(例如 CSV 解析)
要定义 groupByLines
,我需要使用 Iteratee.fold
将前一个块的最后一行与当前块的第一行连接起来。
问题是这会创建一个包含文件所有行的块。
但我想逐块处理文件。我的意思是 groupByLines
应该产生 200 行的块(例如)。
turnIntoLine
也出现同样的问题。我还使用 fold
来创建线条。我需要使用一个累加器(由fold
提供)来压缩行号和行内容。
我是 play iteratee 的初学者。
这是我的代码:
val chunkSize = 1024 * 8
val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize)
def isLastChunk(chunk: Array[Byte]): Boolean = {
chunk.length < chunkSize
}
val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped {
println("groupByLines")
Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) {
case ((accLast, accLines), chunk) =>
println("groupByLines chunk size " + chunk.length)
new String(chunk)
.trim
.split("\n")
.toList match {
case lines @ Cons(h, tail) =>
val lineBetween2Chunks: String = accLast + h
val goodLines =
isLastChunk(chunk) match {
case true => Cons(lineBetween2Chunks, tail)
case false => Cons(lineBetween2Chunks, tail).init
}
(lines.last, accLines ++ goodLines)
case Nil => ("", accLines)
}
}.map(_._2)
}
val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped {
println("turnIntoLines")
Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) {
case ((index, accLines), chunk) =>
println("turnIntoLines chunk size " + chunk.length)
val lines =
((Stream from index) zip chunk).map {
case (lineNumber, content) => Line(lineNumber, content)
}.toList
(index + chunk.length, lines ++ accLines)
}.map(_._2)
}
这里的问题是,如何使用 Play Iteratees 逐行处理文件。
首先,为了 使用 UTF-8
读取文件,我使用了:
object EnumeratorAdditionalOperators {
implicit def enumeratorAdditionalOperators(e: Enumerator.type): EnumeratorAdditionalOperators = new EnumeratorAdditionalOperators(e)
}
class EnumeratorAdditionalOperators(e: Enumerator.type) {
def fromUTF8File(file: File, chunkSize: Int = 1024 * 8): Enumerator[String] =
e.fromFile(file, chunkSize)
.map(bytes => new String(bytes, Charset.forName("UTF-8")))
}
然后,将输入块拆分成行(在'\n'
处剪切):
object EnumerateeAdditionalOperators {
implicit def enumerateeAdditionalOperators(e: Enumeratee.type): EnumerateeAdditionalOperators = new EnumerateeAdditionalOperators(e)
}
class EnumerateeAdditionalOperators(e: Enumeratee.type) {
def splitToLines: Enumeratee[String, String] = e.grouped(
Traversable.splitOnceAt[String,Char](_ != '\n') &>>
Iteratee.consume()
)
}
第三,为了添加行号,我使用了一段在这里找到的代码https://github.com/michaelahlers/michaelahlers-playful/blob/master/src/main/scala/ahlers/michael/playful/iteratee/EnumerateeFactoryOps.scala。
class EnumerateeAdditionalOperators(e: Enumeratee.type) {
/**
* As a complement to [[play.api.libs.iteratee.Enumeratee.heading]] and [[play.api.libs.iteratee.Enumeratee.trailing]], allows for inclusion of arbitrary elements between those from the producer.
*/
def joining[E](separators: Enumerator[E])(implicit ec: ExecutionContext): Enumeratee[E, E] =
zipWithIndex[E] compose Enumeratee.mapInputFlatten[(E, Int)] {
case Input.Empty =>
Enumerator.enumInput[E](Input.Empty)
case Input.El((element, index)) if 0 < index =>
separators andThen Enumerator(element)
case Input.El((element, _)) =>
Enumerator(element)
case Input.EOF =>
Enumerator.enumInput[E](Input.EOF)
}
/**
* Zips elements with an index of the given [[scala.math.Numeric]] type, stepped by the given function.
*
* (Special thanks to [[https://github.com/eecolor EECOLOR]] for inspiring this factory with his answer to [[ a question about enumeratees on Stack Overflow]].)
*/
def zipWithIndex[E, I](first: I, step: I => I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] =
e.scanLeft[E](null.asInstanceOf[E] -> ev.minus(first, step(ev.zero))) {
case ((_, index), value) =>
value -> step(index)
}
/**
* Zips elements with an incrementing index of the given [[scala.math.Numeric]] type, adding one each time.
*/
def zipWithIndex[E, I](first: I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = zipWithIndex(first, ev.plus(_, ev.one))
/**
* Zips elements with an incrementing index by the same contract [[scala.collection.GenIterableLike#zipWithIndex zipWithIndex]].
*/
def zipWithIndex[E]: Enumeratee[E, (E, Int)] = zipWithIndex(0)
// ...
}
请注意,我将 "add" 方法的隐式定义为 Enumerator
和 Enumeratee
。这个技巧可以写成这样:Enumerator.fromUTF8File(file)
.
放在一起:
case class Line(number: Int, value: String)
Enumerator.fromUTF8File(file) &>
Enumeratee.splitToLines ><>
Enumeratee.zipWithIndex ><> Enumeratee.map{
case (e, idx) => Line(idx, e)
} // then an Iteratee or another Enumeratee
新代码比问题中给出的代码更加简洁。
我正在使用播放框架 Iteratee 来读取文件。我想逐块处理这个文件(每一步)。
我编写了以下步骤:
groupByLines: Enumeratee[Array[Byte], List[String]]
turnIntoLines: Enumeratee[List[String], List[Line]]
(我定义了case class Line(number: Int, value: String)
)parseChunk: Iteratee[List[Line], Try[List[T]]]
(例如 CSV 解析)
要定义 groupByLines
,我需要使用 Iteratee.fold
将前一个块的最后一行与当前块的第一行连接起来。
问题是这会创建一个包含文件所有行的块。
但我想逐块处理文件。我的意思是 groupByLines
应该产生 200 行的块(例如)。
turnIntoLine
也出现同样的问题。我还使用 fold
来创建线条。我需要使用一个累加器(由fold
提供)来压缩行号和行内容。
我是 play iteratee 的初学者。
这是我的代码:
val chunkSize = 1024 * 8
val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize)
def isLastChunk(chunk: Array[Byte]): Boolean = {
chunk.length < chunkSize
}
val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped {
println("groupByLines")
Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) {
case ((accLast, accLines), chunk) =>
println("groupByLines chunk size " + chunk.length)
new String(chunk)
.trim
.split("\n")
.toList match {
case lines @ Cons(h, tail) =>
val lineBetween2Chunks: String = accLast + h
val goodLines =
isLastChunk(chunk) match {
case true => Cons(lineBetween2Chunks, tail)
case false => Cons(lineBetween2Chunks, tail).init
}
(lines.last, accLines ++ goodLines)
case Nil => ("", accLines)
}
}.map(_._2)
}
val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped {
println("turnIntoLines")
Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) {
case ((index, accLines), chunk) =>
println("turnIntoLines chunk size " + chunk.length)
val lines =
((Stream from index) zip chunk).map {
case (lineNumber, content) => Line(lineNumber, content)
}.toList
(index + chunk.length, lines ++ accLines)
}.map(_._2)
}
这里的问题是,如何使用 Play Iteratees 逐行处理文件。
首先,为了 使用 UTF-8
读取文件,我使用了:
object EnumeratorAdditionalOperators {
implicit def enumeratorAdditionalOperators(e: Enumerator.type): EnumeratorAdditionalOperators = new EnumeratorAdditionalOperators(e)
}
class EnumeratorAdditionalOperators(e: Enumerator.type) {
def fromUTF8File(file: File, chunkSize: Int = 1024 * 8): Enumerator[String] =
e.fromFile(file, chunkSize)
.map(bytes => new String(bytes, Charset.forName("UTF-8")))
}
然后,将输入块拆分成行(在'\n'
处剪切):
object EnumerateeAdditionalOperators {
implicit def enumerateeAdditionalOperators(e: Enumeratee.type): EnumerateeAdditionalOperators = new EnumerateeAdditionalOperators(e)
}
class EnumerateeAdditionalOperators(e: Enumeratee.type) {
def splitToLines: Enumeratee[String, String] = e.grouped(
Traversable.splitOnceAt[String,Char](_ != '\n') &>>
Iteratee.consume()
)
}
第三,为了添加行号,我使用了一段在这里找到的代码https://github.com/michaelahlers/michaelahlers-playful/blob/master/src/main/scala/ahlers/michael/playful/iteratee/EnumerateeFactoryOps.scala。
class EnumerateeAdditionalOperators(e: Enumeratee.type) {
/**
* As a complement to [[play.api.libs.iteratee.Enumeratee.heading]] and [[play.api.libs.iteratee.Enumeratee.trailing]], allows for inclusion of arbitrary elements between those from the producer.
*/
def joining[E](separators: Enumerator[E])(implicit ec: ExecutionContext): Enumeratee[E, E] =
zipWithIndex[E] compose Enumeratee.mapInputFlatten[(E, Int)] {
case Input.Empty =>
Enumerator.enumInput[E](Input.Empty)
case Input.El((element, index)) if 0 < index =>
separators andThen Enumerator(element)
case Input.El((element, _)) =>
Enumerator(element)
case Input.EOF =>
Enumerator.enumInput[E](Input.EOF)
}
/**
* Zips elements with an index of the given [[scala.math.Numeric]] type, stepped by the given function.
*
* (Special thanks to [[https://github.com/eecolor EECOLOR]] for inspiring this factory with his answer to [[ a question about enumeratees on Stack Overflow]].)
*/
def zipWithIndex[E, I](first: I, step: I => I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] =
e.scanLeft[E](null.asInstanceOf[E] -> ev.minus(first, step(ev.zero))) {
case ((_, index), value) =>
value -> step(index)
}
/**
* Zips elements with an incrementing index of the given [[scala.math.Numeric]] type, adding one each time.
*/
def zipWithIndex[E, I](first: I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = zipWithIndex(first, ev.plus(_, ev.one))
/**
* Zips elements with an incrementing index by the same contract [[scala.collection.GenIterableLike#zipWithIndex zipWithIndex]].
*/
def zipWithIndex[E]: Enumeratee[E, (E, Int)] = zipWithIndex(0)
// ...
}
请注意,我将 "add" 方法的隐式定义为 Enumerator
和 Enumeratee
。这个技巧可以写成这样:Enumerator.fromUTF8File(file)
.
放在一起:
case class Line(number: Int, value: String)
Enumerator.fromUTF8File(file) &>
Enumeratee.splitToLines ><>
Enumeratee.zipWithIndex ><> Enumeratee.map{
case (e, idx) => Line(idx, e)
} // then an Iteratee or another Enumeratee
新代码比问题中给出的代码更加简洁。