Akka 流不处理我的 CSV 文件中的所有行
Akka stream not processing all the lines from my CSV file
我正在使用 Akka 流处理包含 1839 行的 CSV 文件。我添加了计数器来计算处理的行数。
这是我的来源,我确保输入文件中的每一行都少于 700 个字符。
case class ParsedLine(input: String, field1: String, field2: String, field3: String)
val counter0 = new AtomicInteger()
val counter1 = new AtomicInteger()
val lineSource = FileIO
.fromPath(Paths.get(InputFile))
.via(Framing.delimiter(ByteString("\n"), 1024, allowTruncation = true))
.map { l =>
counter0.incrementAndGet()
l.utf8String
}
val parseLine = Flow[String].map { l =>
val words = l.split(",")
ParsedLine(l, words(0), words(1), words(2))
}
这个源码处理如下,对应源码中的每一行,输出中应该有一个处理过的行。
val done = lineSource
.via(parseLine)
.to(Sink.foreach(_.input))
.run()
done.onComplete {
case Success(_) =>
println("Counter0 " + counter0.get())
println("Counter1 " + counter1.get())
system.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
system.terminate()
}
有趣的是计数器打印如下&每次我得到不同的数字。如果我删除 .to(Sink.foreach(_.input))
行,我得到的计数为 1839.
Counter0 1445
Counter1 1667
首先我希望 Counter0 的值比 Counter1 高,因为 Counter0 在 Counter1 之前的阶段,我希望所有的行都被处理并且计数器应该打印总行数 1839。
知道这种情况下发生了什么吗? akka 流是否在两者之间丢弃项目?
您实际上并不是在等待直播结束。
您正在使用 to
附加 Sink.foreach(...)
,这会丢弃 Sink.foreach
阶段的处理细节,仅保留早期阶段的处理阶段。
此外,请记住,您在每一步都在做同样的事情(via
、map
、via
,然后是 to
)。因此,您只跟踪由 FileIO.from(...)
创建的第一个图形步骤的处理阶段。这意味着您只是在等待读取完整文件,而不是等待任何后续处理步骤。
您只需要保留两者的结果并等待它们完成即可。
val stream =
lineSource
.via(parseLine)
.toMat(Sink.foreach(_.input))(Keep.both)
val resultFutures: (Future[IOResult], Future[Done]) = stream.run()
val resultsFuture = Future.sequence(List(resultFutures._1, resultFutures._2))
resultsFuture.onComplete {
case Success(List(ioResult, done)) =>
println(ioResult)
println(done)
println(counter0.get())
actorSystem.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
actorSystem.terminate()
}
或者,您可以选择仅跟踪最后一个处理阶段(在本例中为 Sink.foreach(...)
)
val stream =
lineSource
.via(parseLine)
.toMat(Sink.foreach(_.input))(Keep.right)
val resuleFuture: Future[Done] = stream.run()
resuleFuture.onComplete({
case Success(_) =>
println("Counter0 " + counter0.get())
actorSystem.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
actorSystem.terminate()
})
我正在使用 Akka 流处理包含 1839 行的 CSV 文件。我添加了计数器来计算处理的行数。
这是我的来源,我确保输入文件中的每一行都少于 700 个字符。
case class ParsedLine(input: String, field1: String, field2: String, field3: String)
val counter0 = new AtomicInteger()
val counter1 = new AtomicInteger()
val lineSource = FileIO
.fromPath(Paths.get(InputFile))
.via(Framing.delimiter(ByteString("\n"), 1024, allowTruncation = true))
.map { l =>
counter0.incrementAndGet()
l.utf8String
}
val parseLine = Flow[String].map { l =>
val words = l.split(",")
ParsedLine(l, words(0), words(1), words(2))
}
这个源码处理如下,对应源码中的每一行,输出中应该有一个处理过的行。
val done = lineSource
.via(parseLine)
.to(Sink.foreach(_.input))
.run()
done.onComplete {
case Success(_) =>
println("Counter0 " + counter0.get())
println("Counter1 " + counter1.get())
system.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
system.terminate()
}
有趣的是计数器打印如下&每次我得到不同的数字。如果我删除 .to(Sink.foreach(_.input))
行,我得到的计数为 1839.
Counter0 1445
Counter1 1667
首先我希望 Counter0 的值比 Counter1 高,因为 Counter0 在 Counter1 之前的阶段,我希望所有的行都被处理并且计数器应该打印总行数 1839。
知道这种情况下发生了什么吗? akka 流是否在两者之间丢弃项目?
您实际上并不是在等待直播结束。
您正在使用 to
附加 Sink.foreach(...)
,这会丢弃 Sink.foreach
阶段的处理细节,仅保留早期阶段的处理阶段。
此外,请记住,您在每一步都在做同样的事情(via
、map
、via
,然后是 to
)。因此,您只跟踪由 FileIO.from(...)
创建的第一个图形步骤的处理阶段。这意味着您只是在等待读取完整文件,而不是等待任何后续处理步骤。
您只需要保留两者的结果并等待它们完成即可。
val stream =
lineSource
.via(parseLine)
.toMat(Sink.foreach(_.input))(Keep.both)
val resultFutures: (Future[IOResult], Future[Done]) = stream.run()
val resultsFuture = Future.sequence(List(resultFutures._1, resultFutures._2))
resultsFuture.onComplete {
case Success(List(ioResult, done)) =>
println(ioResult)
println(done)
println(counter0.get())
actorSystem.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
actorSystem.terminate()
}
或者,您可以选择仅跟踪最后一个处理阶段(在本例中为 Sink.foreach(...)
)
val stream =
lineSource
.via(parseLine)
.toMat(Sink.foreach(_.input))(Keep.right)
val resuleFuture: Future[Done] = stream.run()
resuleFuture.onComplete({
case Success(_) =>
println("Counter0 " + counter0.get())
actorSystem.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
actorSystem.terminate()
})