来自 ftp 的 Akka 流,逐行
Akka stream from ftp, line by line
我正在尝试使用 alpakka 和 scala 流从 ftp 服务器读取文件。
我从 Ftp.fromPath(...)
得到的类型是 Source[ByteString, Future[IOResult]]
。我想逐行读取文件(这是一个 CSV 文件),但我不知道如何。
如有任何帮助,我将不胜感激。
有一种按行拆分 Source[ByteString, _]
的标准方法,称为 Framing.delimiter
。可以这样使用:
val source: Source[ByteString, Future[IOResult]] = Ftp.fromPath(...)
val splitter = Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 1024,
allowTruncation = true
)
val result: Source[ByteString, Future[IOResult]] = source.via(splitter)
maximumFrameLength
参数决定一行的最大长度;您可以将其设置为 Int.MaxValue
以获得基本上无限的行长度(尽管如果您的 CSV 行很长可能会很危险),并且 allowTruncation
设置为 true
以允许大小写当您的 CSV 文件末尾没有新行时。
result
源在具体化时将生成对应于每一行的 ByteString
,其中没有换行符。如果您希望您的文件包含 Windows 行分隔符(“\r\n”),那么您需要手动 trim 这些字符串。
我正在尝试使用 alpakka 和 scala 流从 ftp 服务器读取文件。
我从 Ftp.fromPath(...)
得到的类型是 Source[ByteString, Future[IOResult]]
。我想逐行读取文件(这是一个 CSV 文件),但我不知道如何。
如有任何帮助,我将不胜感激。
有一种按行拆分 Source[ByteString, _]
的标准方法,称为 Framing.delimiter
。可以这样使用:
val source: Source[ByteString, Future[IOResult]] = Ftp.fromPath(...)
val splitter = Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 1024,
allowTruncation = true
)
val result: Source[ByteString, Future[IOResult]] = source.via(splitter)
maximumFrameLength
参数决定一行的最大长度;您可以将其设置为 Int.MaxValue
以获得基本上无限的行长度(尽管如果您的 CSV 行很长可能会很危险),并且 allowTruncation
设置为 true
以允许大小写当您的 CSV 文件末尾没有新行时。
result
源在具体化时将生成对应于每一行的 ByteString
,其中没有换行符。如果您希望您的文件包含 Windows 行分隔符(“\r\n”),那么您需要手动 trim 这些字符串。