订阅 Akka 流,读取文件
Subscribing to an Akka stream, reading files
我正在尝试使用 Akka Streams 在 Scala 中读取文件,我想将结果放入列表中。我尝试了以下代码,列表在接收器内增加了新值,但在接收器外我得到了一个空列表。
def readStream (path : String, date : String) : List[Array[String]] = {
var lines: List[scala.Array[String]] = List[scala.Array[String]]()
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
val sink: Sink[String, Future[Done]] = Sink.foreach((x : String) => {
val list : List[scala.Array[String]] = List(x.split("|"))
lines = lines ++ list
// println(lines.length)
})
val result: Unit = FileIO.fromPath(Paths.get(path + "transactions_" + date + ".data"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
.to(sink)
.run()
lines
}
三件事:(1) 将角色系统和实体化器传递给您的方法(显式或作为隐式参数)而不是在方法内部创建它们,(2) 使用 Sink.seq
,以及 (3) 使用toMat
和Keep.right
得到Sink
的物化值(to
保留Source
的物化值):
val result: Future[Seq[String]] =
FileIO.fromPath(...)
.via(Framing.delimiter(ByteString("\n"), 256, true))
.map(_.utf8String)
.toMat(Sink.seq)(Keep.right)
.run()
或者,使用 toMat
和 Keep.right
的 shorthand 是 runWith
:
val result: Future[Seq[String]] =
FileIO.fromPath(...)
.via(Framing.delimiter(ByteString("\n"), 256, true))
.map(_.utf8String)
.runWith(Sink.seq)
我正在尝试使用 Akka Streams 在 Scala 中读取文件,我想将结果放入列表中。我尝试了以下代码,列表在接收器内增加了新值,但在接收器外我得到了一个空列表。
def readStream (path : String, date : String) : List[Array[String]] = {
var lines: List[scala.Array[String]] = List[scala.Array[String]]()
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
val sink: Sink[String, Future[Done]] = Sink.foreach((x : String) => {
val list : List[scala.Array[String]] = List(x.split("|"))
lines = lines ++ list
// println(lines.length)
})
val result: Unit = FileIO.fromPath(Paths.get(path + "transactions_" + date + ".data"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
.to(sink)
.run()
lines
}
三件事:(1) 将角色系统和实体化器传递给您的方法(显式或作为隐式参数)而不是在方法内部创建它们,(2) 使用 Sink.seq
,以及 (3) 使用toMat
和Keep.right
得到Sink
的物化值(to
保留Source
的物化值):
val result: Future[Seq[String]] =
FileIO.fromPath(...)
.via(Framing.delimiter(ByteString("\n"), 256, true))
.map(_.utf8String)
.toMat(Sink.seq)(Keep.right)
.run()
或者,使用 toMat
和 Keep.right
的 shorthand 是 runWith
:
val result: Future[Seq[String]] =
FileIO.fromPath(...)
.via(Framing.delimiter(ByteString("\n"), 256, true))
.map(_.utf8String)
.runWith(Sink.seq)