为什么我的 Akka 数据流停止处理大文件(~250,000 行字符串)但处理小文件?

Why does my Akka data stream stops processing a huge file (~250,000 lines of strings) but works for small file?

我的流适用于 1000 行的小文件,但当我在 ~12MB 和 ~250,000 行的大文件上测试它时停止?我尝试使用缓冲区施加背压并对其进行节流,但仍然是同样的事情...

这是我的数据流:

class UserDataStreaming(usersFile: File) {

  implicit val system = ActorSystemContainer.getInstance().getSystem
  implicit val materializer = ActorSystemContainer.getInstance().getMaterializer

  def startStreaming() = {

    val graph = RunnableGraph.fromGraph(GraphDSL.create() {
      implicit builder =>

      val usersSource = builder.add(Source.fromIterator(() => usersDataLines)).out

      val stringToUserFlowShape: FlowShape[String, User] = builder.add(csvToUser)
      val averageAgeFlowShape: FlowShape[User, (String, Int, Int)] = builder.add(averageUserAgeFlow)

      val averageAgeSink = builder.add(Sink.foreach(averageUserAgeSink)).in

      usersSource ~> stringToUserFlowShape ~> averageAgeFlowShape ~> averageAgeSink

      ClosedShape
    })

    graph.run()
  }

  val usersDataLines = scala.io.Source.fromFile(usersFile, "ISO-8859-1").getLines().drop(1)
  val csvToUser = Flow[String].map(_.split(";").map(_.trim)).map(csvLinesArrayToUser)

  def csvLinesArrayToUser(line: Array[String]) = User(line(0), line(1), line(2))

  def averageUserAgeSink[usersSource](source: usersSource) {
    source match {
      case (age: String, count: Int, totalAge: Int) => println(s"age = $age; Average reader age is: ${Try(totalAge/count).getOrElse(0)} count = $count and total age = $totalAge")
      case bad => println(s"Bad case: $bad")
    }
  }

  def averageUserAgeFlow = Flow[User].fold(("", 0, 0)) {
    (nums: (String, Int, Int), user: User) =>
      var counter: Option[Int] = None
      var totalAge: Option[Int] = None

      val ageInt = Try(user.age.substring(1, user.age.length-1).toInt)
      if (ageInt.isSuccess) {
        counter = Some(nums._2 + 1)
        totalAge = Some(nums._3 + ageInt.get)
      }
      else {
        counter = Some(nums._2 + 0)
        totalAge = Some(nums._3 + 0)
      }

      //println(counter.get)

      (user.age, counter.get, totalAge.get)
  }
}

这是我的主要内容:

object Main {

def main(args: Array[String]): Unit = {

  implicit val system = ActorSystemContainer.getInstance().getSystem
  implicit val materializer = ActorSystemContainer.getInstance().getMaterializer

  val usersFile = new File("data/BX-Users.csv")

  println(usersFile.length())

  val userDataStreamer = new UserDataStreaming(usersFile)
  userDataStreamer.startStreaming()
}

您的 csv 文件的某一行可能存在任何错误。在那种情况下,流实现并停止。尝试像这样定义您的流程:

FlowFlowShape[String, User].map {
  case (user) => try {
    csvToUser(user)
  } 
}.withAttributes(ActorAttributes.supervisionStrategy {
  case ex: Throwable =>
    log.error("Error parsing row event: {}", ex)
    Supervision.Resume
}

在这种情况下,捕获了可能的异常并且流忽略错误并继续。

如果您使用 Supervision.Stop,流将停止。