文件更改时更新流

Update stream on file change

使用下面的代码,我使用 Akka 流读取并打印文件的内容:

package playground

import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import akka.stream.ActorMaterializer

object Greeter extends App {

  implicit val system = ActorSystem("map-management-service")
  implicit val materializer = ActorMaterializer()

  FileIO.fromPath(Paths.get("a.csv"))
    .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)).runForeach(println)

}

我对使用 Akka 流的理解是,如果文件 changes/updates 处理代码,在这种情况下 println 被触发,所以每次更新文件时都会重新读取整个文件。但这并没有发生——文件被读取了一次。

应该如何修改,以便每次更新文件 a.csv 时重新读取文件并重新执行 println 代码

Alpakka 的 DirectoryChangesSource 可能适合您的用例。例如:

import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource

implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()

val myFile = Paths.get("a.csv")
val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)

changes
  .filter {
    case (path, dirChange) =>
      path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
  }
  .flatMapConcat(_ => FileIO.fromPath(myFile).via(Framing.delimiter(ByteString("\n"), 256, true)))
  .map(_.utf8String)
  .runForeach(println)

上面的代码片段在创建文件时打印文件内容,每当文件被修改时,每隔三秒轮询一次。

我想扩展 with a fully runnable Ammonite script:

import $ivy.`com.lightbend.akka::akka-stream-alpakka-file:1.1.1`

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ FileIO, Framing }
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource
import akka.util.ByteString
import java.nio.file.Paths
import scala.concurrent.duration._

implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()

val myFile = Paths.get("a.csv")
val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)

changes
  .filter {
    case (path, dirChange) =>
      path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
  }
  .flatMapConcat {
    case (path, _) => FileIO.fromPath(path).via(Framing.delimiter(ByteString("\n"), 256, true))
  }
  .map(_.utf8String)
  .runForeach(println)

请直接点赞他对原创想法的回答。