Scala 流式传输 live/growing 文件
Scala streaming a live/growing file
我的 Scala 应用程序启动了一个将文件写入磁盘的外部进程。在一个单独的线程中,我想读取该文件并将其内容复制到 OutputStream
,直到该过程完成并且文件不再增长。
有几个极端情况需要考虑:
- 线程准备启动时文件可能还不存在。
- 线程复制的速度可能比进程写入的速度快。换句话说,它可能在文件仍在增长时到达文件末尾。
顺便说一句,我可以向线程传递一个 processCompletionFuture
变量,它指示文件何时完成增长。
有没有一种优雅高效的方法来做到这一点?也许使用 Akka Streams 或 actors?(我试过使用 FileInputStream
之外的 Akka Stream,但是一旦输入流中没有更多字节,流似乎就会终止,这发生在案例 #2).
Alpakka 是一个基于 Akka Streams 的库,它有一个模仿 tail -f
Unix 命令的 FileTailSource
实用程序。例如:
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.alpakka.file.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.OutputStream
import java.nio.file.Path
import scala.concurrent._
import scala.concurrent.duration._
val path: Path = ???
val maxLineSize = 10000
val tailSource: Source[ByteString, NotUsed] = FileTailSource(
path = path,
maxChunkSize = maxLineSize,
startingPosition = 0,
pollingInterval = 500.millis
).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))
上面的 tailSource
逐行读取整个文件,并每 500 毫秒连续读取新附加的数据。要将流内容复制到 OutputStream
,请将源连接到 StreamConverters.fromOutputStream
接收器:
val stream: Future[IOResult] =
tailSource
.runWith(StreamConverters.fromOutputStream(() => new OutputStream {
override def write(i: Int): Unit = ???
override def write(bytes: Array[Byte]): Unit = ???
}))
(请注意,有一个 FileTailSource.lines
方法可以生成 Source[String, NotUsed]
,但在这种情况下,使用 ByteString
而不是 String
更合适。这这就是该示例使用 FileTailSource.apply()
的原因,它会生成 Source[ByteString, NotUsed]
。)
如果实现时文件不存在,流将失败。因此,您需要在 运行 流之前确认文件的存在。这可能有点矫枉过正,但一个想法是为此使用 Alpakka 的 DirectoryChangesSource
。
我的 Scala 应用程序启动了一个将文件写入磁盘的外部进程。在一个单独的线程中,我想读取该文件并将其内容复制到 OutputStream
,直到该过程完成并且文件不再增长。
有几个极端情况需要考虑:
- 线程准备启动时文件可能还不存在。
- 线程复制的速度可能比进程写入的速度快。换句话说,它可能在文件仍在增长时到达文件末尾。
顺便说一句,我可以向线程传递一个 processCompletionFuture
变量,它指示文件何时完成增长。
有没有一种优雅高效的方法来做到这一点?也许使用 Akka Streams 或 actors?(我试过使用 FileInputStream
之外的 Akka Stream,但是一旦输入流中没有更多字节,流似乎就会终止,这发生在案例 #2).
Alpakka 是一个基于 Akka Streams 的库,它有一个模仿 tail -f
Unix 命令的 FileTailSource
实用程序。例如:
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.alpakka.file.scaladsl._
import akka.util.{ ByteString, Timeout }
import java.io.OutputStream
import java.nio.file.Path
import scala.concurrent._
import scala.concurrent.duration._
val path: Path = ???
val maxLineSize = 10000
val tailSource: Source[ByteString, NotUsed] = FileTailSource(
path = path,
maxChunkSize = maxLineSize,
startingPosition = 0,
pollingInterval = 500.millis
).via(Framing.delimiter(ByteString(System.lineSeparator), maxLineSize, true))
上面的 tailSource
逐行读取整个文件,并每 500 毫秒连续读取新附加的数据。要将流内容复制到 OutputStream
,请将源连接到 StreamConverters.fromOutputStream
接收器:
val stream: Future[IOResult] =
tailSource
.runWith(StreamConverters.fromOutputStream(() => new OutputStream {
override def write(i: Int): Unit = ???
override def write(bytes: Array[Byte]): Unit = ???
}))
(请注意,有一个 FileTailSource.lines
方法可以生成 Source[String, NotUsed]
,但在这种情况下,使用 ByteString
而不是 String
更合适。这这就是该示例使用 FileTailSource.apply()
的原因,它会生成 Source[ByteString, NotUsed]
。)
如果实现时文件不存在,流将失败。因此,您需要在 运行 流之前确认文件的存在。这可能有点矫枉过正,但一个想法是为此使用 Alpakka 的 DirectoryChangesSource
。