Akka 流不处理我的 CSV 文件中的所有行

Akka stream not processing all the lines from my CSV file

我正在使用 Akka 流处理包含 1839 行的 CSV 文件。我添加了计数器来计算处理的行数。

这是我的来源,我确保输入文件中的每一行都少于 700 个字符。

case class ParsedLine(input: String, field1: String, field2: String, field3: String)


val counter0 = new AtomicInteger()
val counter1 = new AtomicInteger()

val lineSource = FileIO
    .fromPath(Paths.get(InputFile))
    .via(Framing.delimiter(ByteString("\n"), 1024, allowTruncation = true))
    .map { l =>
      counter0.incrementAndGet()
      l.utf8String
    }

 val parseLine = Flow[String].map { l =>
     val words = l.split(",")
     ParsedLine(l, words(0), words(1), words(2))
 }

这个源码处理如下,对应源码中的每一行,输出中应该有一个处理过的行。

val done = lineSource
    .via(parseLine)
    .to(Sink.foreach(_.input))
    .run()


  done.onComplete {
    case Success(_) =>
      println("Counter0 " + counter0.get())
      println("Counter1 " + counter1.get())
      system.terminate()
    case Failure(e) =>
      println(e.getLocalizedMessage)
      system.terminate()
  }

有趣的是计数器打印如下&每次我得到不同的数字。如果我删除 .to(Sink.foreach(_.input)) 行,我得到的计数为 1839.

Counter0 1445
Counter1 1667

首先我希望 Counter0 的值比 Counter1 高,因为 Counter0 在 Counter1 之前的阶段,我希望所有的行都被处理并且计数器应该打印总行数 1839。

知道这种情况下发生了什么吗? akka 流是否在两者之间丢弃项目?

您实际上并不是在等待直播结束。

您正在使用 to 附加 Sink.foreach(...),这会丢弃 Sink.foreach 阶段的处理细节,仅保留早期阶段的处理阶段。

此外,请记住,您在每一步都在做同样的事情(viamapvia,然后是 to)。因此,您只跟踪由 FileIO.from(...) 创建的第一个图形步骤的处理阶段。这意味着您只是在等待读取完整文件,而不是等待任何后续处理步骤。

您只需要保留两者的结果并等待它们完成即可。

val stream =
 lineSource
   .via(parseLine)
   .toMat(Sink.foreach(_.input))(Keep.both)

val resultFutures: (Future[IOResult], Future[Done]) = stream.run()

val resultsFuture = Future.sequence(List(resultFutures._1, resultFutures._2))

resultsFuture.onComplete {
  case Success(List(ioResult, done)) =>
    println(ioResult)
    println(done)
    println(counter0.get())
    actorSystem.terminate()
  case Failure(e) =>
    println(e.getLocalizedMessage)
    actorSystem.terminate()
}

或者,您可以选择仅跟踪最后一个处理阶段(在本例中为 Sink.foreach(...)

val stream =
  lineSource
    .via(parseLine)
    .toMat(Sink.foreach(_.input))(Keep.right)

val resuleFuture: Future[Done] = stream.run()

resuleFuture.onComplete({
  case Success(_) =>
    println("Counter0 " + counter0.get())
    actorSystem.terminate()
  case Failure(e) =>
    println(e.getLocalizedMessage)
    actorSystem.terminate()
})