Akka 流:读取多个文件
Akka streams: Reading multiple files
我有一个文件列表。我要:
- 将所有这些作为单一来源阅读。
- 文件应该按顺序读取。 (无循环赛)
- 在任何时候都不应要求任何文件完全在内存中。
- 从文件中读取错误会导致流崩溃。
感觉这应该可行:(Scala,akka-streams v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但这会导致编译错误,因为 FileIO
具有与之关联的物化值,而 Source.combine
不支持该值。
映射物化值让我想知道文件读取错误是如何处理的,但编译:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但在运行时抛出 IllegalArgumentException:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
我确实有一个答案 - 不要使用 akka.FileIO
。这似乎工作正常,例如:
val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _)
val source = Source.fromIterator[String](() => sources)
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
我还是想知道有没有更好的解决办法
为了清楚地模块化不同的关注点,下面的代码并没有尽可能简洁。
// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String)
// given as stream of Paths we read those files and count the number of lines
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right)
// Here's our test data source (replace paths with real paths)
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath))
// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes
testFiles runWith lineCounter foreach println
更新哦,我没有看到采纳的答案,因为我没有刷新页面>_<。无论如何我都会把它留在这里,因为我还添加了一些关于错误处理的注释。
我相信以下程序可以满足您的要求:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import java.nio.file.Paths
import scala.concurrent.duration._
object TestMain extends App {
implicit val actorSystem = ActorSystem("test")
implicit val materializer = ActorMaterializer()
implicit def ec = actorSystem.dispatcher
val sources = Vector("build.sbt", ".gitignore")
.map(Paths.get(_))
.map(p =>
FileIO.fromPath(p)
.viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left)
.mapMaterializedValue { f =>
f.onComplete {
case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p")
case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}")
case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e")
}
NotUsed
}
)
val finalSource = Source(sources).flatMapConcat(identity)
val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
result.onComplete {
case Success(n) => println(s"Read $n lines total")
case Failure(e) => println(s"Reading failed: $e")
}
Await.ready(result, 10.seconds)
actorSystem.terminate()
}
这里的关键是 flatMapConcat()
方法:它将流中的每个元素转换为源和 returns 这些源产生的元素流,如果它们是 运行 顺序.
至于处理错误,您可以在 mapMaterializedValue
参数中为 future 添加一个处理程序,或者您可以通过将处理程序放在Sink.foreach
物化的未来价值。我在上面的例子中都做了,如果你测试它,比如说,在一个不存在的文件上,你会看到同样的错误信息会被打印两次。不幸的是,flatMapConcat()
不收集具体化的值,坦率地说,我看不出它可以正常地做到这一点,因此如有必要,您必须单独处理它们。
我有一个文件列表。我要:
- 将所有这些作为单一来源阅读。
- 文件应该按顺序读取。 (无循环赛)
- 在任何时候都不应要求任何文件完全在内存中。
- 从文件中读取错误会导致流崩溃。
感觉这应该可行:(Scala,akka-streams v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但这会导致编译错误,因为 FileIO
具有与之关联的物化值,而 Source.combine
不支持该值。
映射物化值让我想知道文件读取错误是如何处理的,但编译:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但在运行时抛出 IllegalArgumentException:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
我确实有一个答案 - 不要使用 akka.FileIO
。这似乎工作正常,例如:
val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _)
val source = Source.fromIterator[String](() => sources)
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
我还是想知道有没有更好的解决办法
为了清楚地模块化不同的关注点,下面的代码并没有尽可能简洁。
// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String)
// given as stream of Paths we read those files and count the number of lines
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right)
// Here's our test data source (replace paths with real paths)
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath))
// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes
testFiles runWith lineCounter foreach println
更新哦,我没有看到采纳的答案,因为我没有刷新页面>_<。无论如何我都会把它留在这里,因为我还添加了一些关于错误处理的注释。
我相信以下程序可以满足您的要求:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import java.nio.file.Paths
import scala.concurrent.duration._
object TestMain extends App {
implicit val actorSystem = ActorSystem("test")
implicit val materializer = ActorMaterializer()
implicit def ec = actorSystem.dispatcher
val sources = Vector("build.sbt", ".gitignore")
.map(Paths.get(_))
.map(p =>
FileIO.fromPath(p)
.viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left)
.mapMaterializedValue { f =>
f.onComplete {
case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p")
case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}")
case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e")
}
NotUsed
}
)
val finalSource = Source(sources).flatMapConcat(identity)
val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
result.onComplete {
case Success(n) => println(s"Read $n lines total")
case Failure(e) => println(s"Reading failed: $e")
}
Await.ready(result, 10.seconds)
actorSystem.terminate()
}
这里的关键是 flatMapConcat()
方法:它将流中的每个元素转换为源和 returns 这些源产生的元素流,如果它们是 运行 顺序.
至于处理错误,您可以在 mapMaterializedValue
参数中为 future 添加一个处理程序,或者您可以通过将处理程序放在Sink.foreach
物化的未来价值。我在上面的例子中都做了,如果你测试它,比如说,在一个不存在的文件上,你会看到同样的错误信息会被打印两次。不幸的是,flatMapConcat()
不收集具体化的值,坦率地说,我看不出它可以正常地做到这一点,因此如有必要,您必须单独处理它们。