订阅 Akka 流,读取文件

Subscribing to an Akka stream, reading files

我正在尝试使用 Akka Streams 在 Scala 中读取文件,我想将结果放入列表中。我尝试了以下代码,列表在接收器内增加了新值,但在接收器外我得到了一个空列表。

def readStream (path : String, date : String) : List[Array[String]] = {
  var lines: List[scala.Array[String]] = List[scala.Array[String]]()

  implicit val system = ActorSystem("Sys")
  val settings = ActorMaterializerSettings(system)
  implicit val materializer = ActorMaterializer(settings)

  val sink: Sink[String, Future[Done]] = Sink.foreach((x : String) => {
    val list : List[scala.Array[String]] = List(x.split("|"))
    lines = lines ++ list
    // println(lines.length)
  })

  val result: Unit = FileIO.fromPath(Paths.get(path + "transactions_" + date + ".data"))
    .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
    .to(sink)
    .run()
  lines
}

三件事:(1) 将角色系统和实体化器传递给您的方法(显式或作为隐式参数)而不是在方法内部创建它们,(2) 使用 Sink.seq,以及 (3) 使用toMatKeep.right得到Sink的物化值(to保留Source的物化值):

val result: Future[Seq[String]] =
  FileIO.fromPath(...)
    .via(Framing.delimiter(ByteString("\n"), 256, true))
    .map(_.utf8String)
    .toMat(Sink.seq)(Keep.right)
    .run()

或者,使用 toMatKeep.right 的 shorthand 是 runWith:

val result: Future[Seq[String]] =
  FileIO.fromPath(...)
    .via(Framing.delimiter(ByteString("\n"), 256, true))
    .map(_.utf8String)
    .runWith(Sink.seq)