来自 ftp 的 Akka 流,逐行

Akka stream from ftp, line by line

我正在尝试使用 alpakka 和 scala 流从 ftp 服务器读取文件。 我从 Ftp.fromPath(...) 得到的类型是 Source[ByteString, Future[IOResult]]。我想逐行读取文件(这是一个 CSV 文件),但我不知道如何。

如有任何帮助,我将不胜感激。

有一种按行拆分 Source[ByteString, _] 的标准方法,称为 Framing.delimiter。可以这样使用:

val source: Source[ByteString, Future[IOResult]] = Ftp.fromPath(...)

val splitter = Framing.delimiter(
  ByteString("\n"),
  maximumFrameLength = 1024,
  allowTruncation = true
)

val result: Source[ByteString, Future[IOResult]] = source.via(splitter)

maximumFrameLength参数决定一行的最大长度;您可以将其设置为 Int.MaxValue 以获得基本上无限的行长度(尽管如果您的 CSV 行很长可能会很危险),并且 allowTruncation 设置为 true 以允许大小写当您的 CSV 文件末尾没有新行时。

result 源在具体化时将生成对应于每一行的 ByteString,其中没有换行符。如果您希望您的文件包含 Windows 行分隔符(“\r\n”),那么您需要手动 trim 这些字符串。