Scala 流式传输 live/growing 文件

Scala streaming a live/growing file

我的 Scala 应用程序启动了一个将文件写入磁盘的外部进程。在一个单独的线程中,我想读取该文件并将其内容复制到 OutputStream,直到该过程完成并且文件不再增长。

有几个极端情况需要考虑:

  1. 线程准备启动时文件可能还不存在。
  2. 线程复制的速度可能比进程写入的速度快。换句话说,它可能在文件仍在增长时到达文件末尾。

顺便说一句,我可以向线程传递一个 processCompletionFuture 变量,它指示文件何时完成增长。

有没有一种优雅高效的方法来做到这一点?也许使用 Akka Streams 或 actors?(我试过使用 FileInputStream 之外的 Akka Stream,但是一旦输入流中没有更多字节,流似乎就会终止,这发生在案例 #2).

Alpakka 是一个基于 Akka Streams 的库,它有一个模仿 tail -f Unix 命令的 FileTailSource 实用程序。例如:

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.alpakka.file.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.OutputStream
import java.nio.file.Path
import scala.concurrent._
import scala.concurrent.duration._

val path: Path = ???

val maxLineSize = 10000

val tailSource: Source[ByteString, NotUsed] = FileTailSource(
  path = path,
  maxChunkSize = maxLineSize,
  startingPosition = 0,
  pollingInterval = 500.millis
).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))

上面的 tailSource 逐行读取整个文件,并每 500 毫秒连续读取新附加的数据。要将流内容复制到 OutputStream,请将源连接到 StreamConverters.fromOutputStream 接收器:

val stream: Future[IOResult] =
  tailSource
    .runWith(StreamConverters.fromOutputStream(() => new OutputStream {
      override def write(i: Int): Unit = ???
      override def write(bytes: Array[Byte]): Unit = ???
    }))

(请注意,有一个 FileTailSource.lines 方法可以生成 Source[String, NotUsed],但在这种情况下,使用 ByteString 而不是 String 更合适。这这就是该示例使用 FileTailSource.apply() 的原因,它会生成 Source[ByteString, NotUsed]。)

如果实现时文件不存在,流将失败。因此,您需要在 运行 流之前确认文件的存在。这可能有点矫枉过正,但一个想法是为此使用 Alpakka 的 DirectoryChangesSource