如何使用 Play Iteratees 为过程的每个步骤逐块读取和处理文件

How to read and process a file chunk by chunk for each step of the process using Play Iteratees

我正在使用播放框架 Iteratee 来读取文件。我想逐块处理这个文件(每一步)。


要定义 groupByLines,我需要使用 Iteratee.fold 将前一个块的最后一行与当前块的第一行连接起来。


但我想逐块处理文件。我的意思是 groupByLines 应该产生 200 行的块(例如)。

turnIntoLine 也出现同样的问题。我还使用 fold 来创建线条。我需要使用一个累加器(由fold提供)来压缩行号和行内容。

我是 play iteratee 的初学者。


val chunkSize = 1024 * 8

val enumerator: Enumerator[Array[Byte]] = Enumerator.fromFile(file, chunkSize)

def isLastChunk(chunk: Array[Byte]): Boolean = {
  chunk.length < chunkSize

val groupByLines: Enumeratee[Array[Byte], List[String]] = Enumeratee.grouped {
  Iteratee.fold[Array[Byte], (String, List[String])]("", List.empty) {
    case ((accLast, accLines), chunk) =>
      println("groupByLines chunk size " + chunk.length)
      new String(chunk)
        .toList match {
        case lines  @ Cons(h, tail) =>
          val lineBetween2Chunks: String = accLast + h

          val goodLines =
            isLastChunk(chunk) match {
              case true  => Cons(lineBetween2Chunks, tail)
              case false => Cons(lineBetween2Chunks, tail).init

          (lines.last, accLines ++ goodLines)
        case Nil => ("", accLines)

val turnIntoLines: Enumeratee[List[String], List[Line]] = Enumeratee.grouped {
  Iteratee.fold[List[String], (Int, List[Line])](0, List.empty) {
    case ((index, accLines), chunk) =>
      println("turnIntoLines chunk size " + chunk.length)
      val lines =
        ((Stream from index) zip chunk).map {
          case (lineNumber, content) => Line(lineNumber, content)
      (index + chunk.length, lines ++ accLines)

这里的问题是,如何使用 Play Iteratees 逐行处理文件。

首先,为了 使用 UTF-8 读取文件,我使用了:

object EnumeratorAdditionalOperators {
  implicit def enumeratorAdditionalOperators(e: Enumerator.type): EnumeratorAdditionalOperators = new EnumeratorAdditionalOperators(e)

class EnumeratorAdditionalOperators(e: Enumerator.type) {

  def fromUTF8File(file: File, chunkSize: Int = 1024 * 8): Enumerator[String] =
    e.fromFile(file, chunkSize)
      .map(bytes => new String(bytes, Charset.forName("UTF-8")))



object EnumerateeAdditionalOperators {
  implicit def enumerateeAdditionalOperators(e: Enumeratee.type): EnumerateeAdditionalOperators = new EnumerateeAdditionalOperators(e)

class EnumerateeAdditionalOperators(e: Enumeratee.type) {

  def splitToLines: Enumeratee[String, String] = e.grouped(
    Traversable.splitOnceAt[String,Char](_ != '\n')  &>>



class EnumerateeAdditionalOperators(e: Enumeratee.type) {

    * As a complement to [[play.api.libs.iteratee.Enumeratee.heading]] and [[play.api.libs.iteratee.Enumeratee.trailing]], allows for inclusion of arbitrary elements between those from the producer.
  def joining[E](separators: Enumerator[E])(implicit ec: ExecutionContext): Enumeratee[E, E] =
    zipWithIndex[E] compose Enumeratee.mapInputFlatten[(E, Int)] {

      case Input.Empty =>

      case Input.El((element, index)) if 0 < index =>
        separators andThen Enumerator(element)

      case Input.El((element, _)) =>

      case Input.EOF =>


    * Zips elements with an index of the given [[scala.math.Numeric]] type, stepped by the given function.
    * (Special thanks to [[https://github.com/eecolor EECOLOR]] for inspiring this factory with his answer to [[ a question about enumeratees on Stack Overflow]].)
  def zipWithIndex[E, I](first: I, step: I => I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] =
    e.scanLeft[E](null.asInstanceOf[E] -> ev.minus(first, step(ev.zero))) {
      case ((_, index), value) =>
        value -> step(index)

    * Zips elements with an incrementing index of the given [[scala.math.Numeric]] type, adding one each time.
  def zipWithIndex[E, I](first: I)(implicit ev: Numeric[I]): Enumeratee[E, (E, I)] = zipWithIndex(first, ev.plus(_, ev.one))

    * Zips elements with an incrementing index by the same contract [[scala.collection.GenIterableLike#zipWithIndex zipWithIndex]].
  def zipWithIndex[E]: Enumeratee[E, (E, Int)] = zipWithIndex(0)

  // ...


请注意,我将 "add" 方法的隐式定义为 EnumeratorEnumeratee。这个技巧可以写成这样:Enumerator.fromUTF8File(file).


case class Line(number: Int, value: String)

Enumerator.fromUTF8File(file) &>
Enumeratee.splitToLines ><>
Enumeratee.zipWithIndex ><> Enumeratee.map{
  case (e, idx) => Line(idx, e)
} // then an Iteratee or another Enumeratee
