在 scala 中使用 akka 流读取多个文件

Reading multiple files with akka streams in scala

我正在尝试使用 akka 流读取多个文件并将结果放入列表中。 我可以毫无问题地读取一个文件。 return 类型是 Future[Seq[String]]。问题是处理 Future 中的序列必须在 onComplete{} 中进行。

我正在尝试以下代码,但显然它不起作用。 onComplete 之外的列表 acc 为空。但在 inComplete 中保留值。我明白这个问题,但我不知道如何解决这个问题。

// works fine  
def readStream(path: String, date: String): Future[Seq[String]] = {
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)

val result: Future[Seq[String]] =
  FileIO.fromPath(Paths.get(path + "transactions_" + date + 
".data"))
    .via(Framing.delimiter(ByteString("\n"), 256, true))
    .map(_.utf8String)
    .toMat(Sink.seq)(Keep.right)
    .run()
 var aa: List[scala.Array[String]] = Nil
 result.onComplete(x => {
  aa = x.get.map(line => line.split('|')).toList
})
 result
}

//this won't work  
def concatFiles(path : String, date : String, numberOfDays : Int) : 
List[scala.Array[String]] = {
val formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
val formattedDate = LocalDate.parse(date, formatter);
var acc = List[scala.Array[String]]()

for( a <- 0 to numberOfDays){
  val date = formattedDate.minusDays(a).toString().replace("-", "")


  val transactions = readStream(path , date)
  var result: List[scala.Array[String]] = Nil
  transactions.onComplete(x => {
    result = x.get.map(line => line.split('|')).toList 
    acc=  acc ++ result })
}
acc}

一般解

给定一个 Paths 值的迭代器 Source 的文件行可以通过组合 FileIO & flatMapConcat:

创建
val lineSourceFromPaths : (() => Iterator[Path]) => Source[String, _] = pathsIterator =>
  Source
    .fromIterator(pathsIterator)
    .flatMapConcat { path =>
      FileIO
        .fromPath(path)
        .via(Framing.delimiter(ByteString("\n"), 256, true))
        .map(_.utf8String)
    }

问题申请

您的 List 为空的原因是 Future 值尚未完成,因此您的可变列表未在函数 return 列表之前更新。

问题中的代码批判

问题中代码的组织和风格暗示了与 akkaFuture 相关的一些误解。我认为您在不了解您尝试使用的工具的基础知识的情况下尝试一个相当复杂的工作流程。

1.You 不应在每次调用函数时创建 ActorSystem。每个应用程序通常有 1 个 ActorSystem,并且只创建一次。

implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)

def readStream(...

2.You 应该尽量避免可变集合,而是使用具有相应功能的 Iterator

def concatFiles(path : String, date : String, numberOfDays : Int) : List[scala.Array[String]] = {

  val formattedDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyyMMdd"))

  val pathsIterator : () => Iterator[Path] = () => 
    Iterator
      .range(0, numberOfDays+1)
      .map(formattedDate.minusDays)
      .map(_.String().replace("-", "")
      .map(path => Paths.get(path + "transactions_" + date + ".data")

  lineSourceFromPaths(pathsIterator)

3.Since 你正在处理 Futures 你不应该等待 Futures 完成,而是应该将 concateFiles 的 return 类型更改为 Future[List[Array[String]]].