AKKA FileIO 流解析为换行符和 EOF
AKKA FileIO Stream Parsing to Newline & EOF
我正在使用 Akka FileIO(在 scala 中)创建一个文件解析器,该文件解析器旨在从输入文件中读取每一行并应用一个简单的接收器。除文件中以 EOF 结尾的最后一行外,每一行都用换行符 ('\n') 分隔。
如何处理换行符和 eof 定界,以便我可以可靠地读取最后一行,而不必依赖于最后的 '/n' 字符?
var rowNum = 0
val simpleMsgSink: Sink[String, Future[Done]] =
Sink.foreach {
case row: String => {
println(s"$rowNum: $row")
rowNum = rowNum+1
}
}
val source = FileIO.fromPath(file, 1 * 1024 * 1024 )
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.runWith(simpleMsgSink)
如果对文件执行此操作(最后一行末尾没有换行符):
Sensor_ID,Location,Seqno,gwrx.time,Temp,Humidity,Noise,CO2,Water
A0890,"51.645368, 0.072211",1,42793.00278,16,48,36,325,0
A0891,"51.645370, 0.072300",1,42793.00278,15,41,34,353,3
输出为:
0: Sensor_ID,Location,Seqno,gwrx.time,Temp,Humidity,Noise,CO2,Water
1: A0890,"51.645368, 0.072211",1,42793.00278,16,48,36,325,0
我如何拾取最后一行?
如果你看一下Framing.delimiter
的scala doc,你会发现它实际上有第三个参数:allowTruncation
,默认值为false
。以下是 scaladoc 对它的描述:
If false
, then when the last frame being decoded contains no valid delimiter this Flow fails the stream instead of returning a truncated frame.
所以你所要做的就是添加缺少的参数:
Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true)
我正在使用 Akka FileIO(在 scala 中)创建一个文件解析器,该文件解析器旨在从输入文件中读取每一行并应用一个简单的接收器。除文件中以 EOF 结尾的最后一行外,每一行都用换行符 ('\n') 分隔。
如何处理换行符和 eof 定界,以便我可以可靠地读取最后一行,而不必依赖于最后的 '/n' 字符?
var rowNum = 0
val simpleMsgSink: Sink[String, Future[Done]] =
Sink.foreach {
case row: String => {
println(s"$rowNum: $row")
rowNum = rowNum+1
}
}
val source = FileIO.fromPath(file, 1 * 1024 * 1024 )
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.runWith(simpleMsgSink)
如果对文件执行此操作(最后一行末尾没有换行符):
Sensor_ID,Location,Seqno,gwrx.time,Temp,Humidity,Noise,CO2,Water
A0890,"51.645368, 0.072211",1,42793.00278,16,48,36,325,0
A0891,"51.645370, 0.072300",1,42793.00278,15,41,34,353,3
输出为:
0: Sensor_ID,Location,Seqno,gwrx.time,Temp,Humidity,Noise,CO2,Water
1: A0890,"51.645368, 0.072211",1,42793.00278,16,48,36,325,0
我如何拾取最后一行?
如果你看一下Framing.delimiter
的scala doc,你会发现它实际上有第三个参数:allowTruncation
,默认值为false
。以下是 scaladoc 对它的描述:
If
false
, then when the last frame being decoded contains no valid delimiter this Flow fails the stream instead of returning a truncated frame.
所以你所要做的就是添加缺少的参数:
Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true)