akka.io 2.3逐行接收

akka.io receive line by line in 2.3

我正在使用 Akka 2.3(因为这是 Play 附带的版本)并且想要连接到某个 TCP 套接字。我知道 akka.io 包。但是,我看不到任何方法可以将接收到的数据作为 UTF-8 字符串 逐行处理 (反对只接收字节块)。

在网上搜索时,有很多对 Akka 2.2 的实验管道 API 的引用。然而,这个 API 在 Akka 中又被删除了。

我正在寻找的是在大多数缓冲区 类 中称为 readLine 的东西,但对于 Akka I/O 框架。

旧管道的替代品是Akka Streams,它提供 TCP 实现。可以向此类流添加行解析器以轻松获得 "stream of lines".

很快将合并内置的行解析器+str #17310: Basic framing support. Currently the line parser is a cookbook you can paste into your project: Akka Streams Cookbook: Parsing lines from ByteStrings

我建议您先阅读 Working with Streaming IO 文档,然后修改示例以使其符合您的用例(通过使用 parseLines 食谱食谱)。

请注意,Akka Streams 仍处于实验阶段,API 可能会略有变化。 另一个重要信息是它们实现了 www.reactive-streams.org 规范,因此您可以开箱即用地获得背压,而不必手动担心它:-)

Akka Stream 看起来很有前途,但是由于它仍未发布,我决定通过简单地联系缓冲区中的所有数据并等待分隔符来自己实现它。

  private val separatorBytes = // like CRLF
  private var buffer = ByteString.empty
  private var nextPossibleMatch = 0

  // when receiving chunks of bytes they are appended to buffer and doParseLines is executed

  private def doParseLines(parsedLinesSoFar: Vector[String] = Vector()): Vector[String] = {
    val possibleMatchPos = buffer.indexOf(separatorBytes.head, from = nextPossibleMatch)
    if (possibleMatchPos == -1) {
      parsedLinesSoFar
    } else {
      if (possibleMatchPos + separatorBytes.size > buffer.size) {
        nextPossibleMatch = possibleMatchPos
        parsedLinesSoFar
      } else {
        if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) == separatorBytes) {
          // Found a match
          val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
          buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
          nextPossibleMatch -= possibleMatchPos + separatorBytes.size
          doParseLines(parsedLinesSoFar :+ parsedLine)
        } else {
          nextPossibleMatch += 1
          doParseLines(parsedLinesSoFar)
        }
      }
    }
  }