文件更改时更新流
Update stream on file change
使用下面的代码,我使用 Akka 流读取并打印文件的内容:
package playground
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import akka.stream.ActorMaterializer
object Greeter extends App {
implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()
FileIO.fromPath(Paths.get("a.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)).runForeach(println)
}
我对使用 Akka 流的理解是,如果文件 changes/updates 处理代码,在这种情况下 println
被触发,所以每次更新文件时都会重新读取整个文件。但这并没有发生——文件被读取了一次。
应该如何修改,以便每次更新文件 a.csv
时重新读取文件并重新执行 println
代码
Alpakka 的 DirectoryChangesSource
可能适合您的用例。例如:
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource
implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()
val myFile = Paths.get("a.csv")
val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)
changes
.filter {
case (path, dirChange) =>
path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
}
.flatMapConcat(_ => FileIO.fromPath(myFile).via(Framing.delimiter(ByteString("\n"), 256, true)))
.map(_.utf8String)
.runForeach(println)
上面的代码片段在创建文件时打印文件内容,每当文件被修改时,每隔三秒轮询一次。
我想扩展 with a fully runnable Ammonite script:
import $ivy.`com.lightbend.akka::akka-stream-alpakka-file:1.1.1`
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ FileIO, Framing }
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource
import akka.util.ByteString
import java.nio.file.Paths
import scala.concurrent.duration._
implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()
val myFile = Paths.get("a.csv")
val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)
changes
.filter {
case (path, dirChange) =>
path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
}
.flatMapConcat {
case (path, _) => FileIO.fromPath(path).via(Framing.delimiter(ByteString("\n"), 256, true))
}
.map(_.utf8String)
.runForeach(println)
请直接点赞他对原创想法的回答。
使用下面的代码,我使用 Akka 流读取并打印文件的内容:
package playground
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import akka.stream.ActorMaterializer
object Greeter extends App {
implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()
FileIO.fromPath(Paths.get("a.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String)).runForeach(println)
}
我对使用 Akka 流的理解是,如果文件 changes/updates 处理代码,在这种情况下 println
被触发,所以每次更新文件时都会重新读取整个文件。但这并没有发生——文件被读取了一次。
应该如何修改,以便每次更新文件 a.csv
时重新读取文件并重新执行 println
代码
Alpakka 的 DirectoryChangesSource
可能适合您的用例。例如:
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource
implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()
val myFile = Paths.get("a.csv")
val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)
changes
.filter {
case (path, dirChange) =>
path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
}
.flatMapConcat(_ => FileIO.fromPath(myFile).via(Framing.delimiter(ByteString("\n"), 256, true)))
.map(_.utf8String)
.runForeach(println)
上面的代码片段在创建文件时打印文件内容,每当文件被修改时,每隔三秒轮询一次。
我想扩展
import $ivy.`com.lightbend.akka::akka-stream-alpakka-file:1.1.1`
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ FileIO, Framing }
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource
import akka.util.ByteString
import java.nio.file.Paths
import scala.concurrent.duration._
implicit val system = ActorSystem("map-management-service")
implicit val materializer = ActorMaterializer()
val myFile = Paths.get("a.csv")
val changes = DirectoryChangesSource(Paths.get("."), pollInterval = 3.seconds, maxBufferSize = 1000)
changes
.filter {
case (path, dirChange) =>
path.endsWith(myFile) && (dirChange == DirectoryChange.Creation || dirChange == DirectoryChange.Modification)
}
.flatMapConcat {
case (path, _) => FileIO.fromPath(path).via(Framing.delimiter(ByteString("\n"), 256, true))
}
.map(_.utf8String)
.runForeach(println)
请直接点赞他对原创想法的回答。