akka.io 2.3逐行接收
akka.io receive line by line in 2.3
我正在使用 Akka 2.3(因为这是 Play 附带的版本)并且想要连接到某个 TCP 套接字。我知道 akka.io
包。但是,我看不到任何方法可以将接收到的数据作为 UTF-8 字符串 逐行处理 (反对只接收字节块)。
在网上搜索时,有很多对 Akka 2.2 的实验管道 API 的引用。然而,这个 API 在 Akka 中又被删除了。
我正在寻找的是在大多数缓冲区 类 中称为 readLine
的东西,但对于 Akka I/O 框架。
旧管道的替代品是Akka Streams,它提供流 TCP 实现。可以向此类流添加行解析器以轻松获得 "stream of lines".
很快将合并内置的行解析器+str #17310: Basic framing support. Currently the line parser is a cookbook you can paste into your project: Akka Streams Cookbook: Parsing lines from ByteStrings。
我建议您先阅读 Working with Streaming IO 文档,然后修改示例以使其符合您的用例(通过使用 parseLines
食谱食谱)。
请注意,Akka Streams 仍处于实验阶段,API 可能会略有变化。
另一个重要信息是它们实现了 www.reactive-streams.org 规范,因此您可以开箱即用地获得背压,而不必手动担心它:-)
Akka Stream 看起来很有前途,但是由于它仍未发布,我决定通过简单地联系缓冲区中的所有数据并等待分隔符来自己实现它。
private val separatorBytes = // like CRLF
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
// when receiving chunks of bytes they are appended to buffer and doParseLines is executed
private def doParseLines(parsedLinesSoFar: Vector[String] = Vector()): Vector[String] = {
val possibleMatchPos = buffer.indexOf(separatorBytes.head, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) == separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParseLines(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParseLines(parsedLinesSoFar)
}
}
}
}
我正在使用 Akka 2.3(因为这是 Play 附带的版本)并且想要连接到某个 TCP 套接字。我知道 akka.io
包。但是,我看不到任何方法可以将接收到的数据作为 UTF-8 字符串 逐行处理 (反对只接收字节块)。
在网上搜索时,有很多对 Akka 2.2 的实验管道 API 的引用。然而,这个 API 在 Akka 中又被删除了。
我正在寻找的是在大多数缓冲区 类 中称为 readLine
的东西,但对于 Akka I/O 框架。
旧管道的替代品是Akka Streams,它提供流 TCP 实现。可以向此类流添加行解析器以轻松获得 "stream of lines".
很快将合并内置的行解析器+str #17310: Basic framing support. Currently the line parser is a cookbook you can paste into your project: Akka Streams Cookbook: Parsing lines from ByteStrings。
我建议您先阅读 Working with Streaming IO 文档,然后修改示例以使其符合您的用例(通过使用 parseLines
食谱食谱)。
请注意,Akka Streams 仍处于实验阶段,API 可能会略有变化。 另一个重要信息是它们实现了 www.reactive-streams.org 规范,因此您可以开箱即用地获得背压,而不必手动担心它:-)
Akka Stream 看起来很有前途,但是由于它仍未发布,我决定通过简单地联系缓冲区中的所有数据并等待分隔符来自己实现它。
private val separatorBytes = // like CRLF
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
// when receiving chunks of bytes they are appended to buffer and doParseLines is executed
private def doParseLines(parsedLinesSoFar: Vector[String] = Vector()): Vector[String] = {
val possibleMatchPos = buffer.indexOf(separatorBytes.head, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) == separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParseLines(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParseLines(parsedLinesSoFar)
}
}
}
}