是否从 akka i/o 中删除了管道?

Were pipelines removed from akka i/o?

在学习如何使用 akka I/O 时,我正在尝试在 akka i/o 之上实现一个简单的协议,并遵循文档 here.

但是在我的 gradle 文件中,我使用的是版本 2.3.9,如下所示

dependencies {
    compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.7'
    compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.3.9'
    compile group: 'com.typesafe.akka', name: 'akka-contrib_2.11', version: '2.3.9'
    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.5'
    testCompile group: 'junit', name: 'junit', version: '4.11'
}

导入一些管道特定的东西,比如

import akka.io.SymmetricPipelineStage;
import akka.io.PipelineContext;
import akka.io.SymmetricPipePair;

生成无法解决符号错误。

因此我的问题。

  1. 是否删除了这些,或者我需要将一些依赖项添加到我的 gradle 文件中。
  2. 如果去掉,encod/decode阶段会如何处理?

管道是实验性的,确实在 Akka 2.3 中被删除了。 删除记录在 Migration Guide 2.2.x to 2.3.x.

还提到能够将 "older" 管道实现与 Akka 2.3 here 打包在一起,尽管它似乎不是简单地添加依赖项。

我敢打赌,Akka Streams 旨在更好地替代管道,在 Akka 2.4 中出现,但现在可以作为 experimental module 使用。 encode/decode 阶段或协议层可以通过结合使用 Akka Streams 和 Akka I/O.

来处理

是的,没有任何替代方案就删除了管道。我来自 Netty 世界,没有找到管道 "unintuitive" - 它们积累缓冲区并为儿童演员提供随时可用的消息。

看看我们的解决方案,它需要 "org.scalaz" %% "scalaz-core" % 7.2.14 作为依赖项。

Codec class 是一个 State monad,它被 actor 调用并产生输出。在我们的项目中,我们使用 Varint32 protobuf encoding,因此每条消息都以 varint32 长度字段作为前缀:

import com.google.protobuf.CodedInputStream
import com.trueaccord.scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
import com.zeptolab.tlc.front.codecs.Varint32ProtoCodec.ProtoMessage

import scalaz.{-\/, State, \/, \/-}

trait Accumulator
trait Codec[IN, OUT] {

  type Stream = State[Accumulator, Seq[IN]]

  def decode(buffer: Array[Byte]): Throwable \/ IN

  def encode(message: OUT): Array[Byte]

  def emptyAcc: Accumulator

  def decodeStream(data: Array[Byte]): Stream

}

object Varint32ProtoCodec {

  type ProtoMessage[T] = GeneratedMessage with Message[T]

  def apply[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) = new Varint32ProtoCodec[IN, OUT](protoType)

}

class Varint32ProtoCodec[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) extends Codec[IN, OUT] {

  import com.google.protobuf.CodedOutputStream

  private case class AccumulatorImpl(expected: Int = -1, buffer: Array[Byte] = Array.empty) extends Accumulator

  override def emptyAcc: Accumulator = AccumulatorImpl()

  override def decode(buffer: Array[Byte]): Throwable \/ IN = {
    \/.fromTryCatchNonFatal {
      val dataLength = CodedInputStream.newInstance(buffer).readRawVarint32()
      val bufferLength = buffer.length
      val dataBuffer = buffer.drop(bufferLength - dataLength)
      protoType.parseFrom(dataBuffer)
    }
  }

  override def encode(message: OUT): Array[Byte] = {
    val messageBuf = message.toByteArray
    val messageBufLength = messageBuf.length
    val prependLength = CodedOutputStream.computeUInt32SizeNoTag(messageBufLength)
    val prependLengthBuffer = new Array[Byte](prependLength)
    CodedOutputStream.newInstance(prependLengthBuffer).writeUInt32NoTag(messageBufLength)
    prependLengthBuffer ++ messageBuf
  }

  override def decodeStream(data: Array[Byte]): Stream = State {
    case acc: AccumulatorImpl =>
      if (data.isEmpty) {
        (acc, Seq.empty)
      } else {
        val accBuffer = acc.buffer ++ data
        val accExpected = readExpectedLength(accBuffer, acc)
        if (accBuffer.length >= accExpected) {
          val (frameBuffer, restBuffer) = accBuffer.splitAt(accExpected)
          val output = decode(frameBuffer) match {
            case \/-(proto) => Seq(proto)
            case -\/(_) => Seq.empty
          }
          val (newAcc, recOutput) = decodeStream(restBuffer).run(emptyAcc)
          (newAcc, output ++ recOutput)
        } else (AccumulatorImpl(accExpected, accBuffer), Seq.empty)
      }
    case _ => (emptyAcc, Seq.empty)
  }

  private def readExpectedLength(data: Array[Byte], acc: AccumulatorImpl) = {
    if (acc.expected == -1 && data.length >= 1) {
      \/.fromTryCatchNonFatal {
        val is = CodedInputStream.newInstance(data)
        val dataLength = is.readRawVarint32()
        val tagLength = is.getTotalBytesRead
        dataLength + tagLength
      }.getOrElse(acc.expected)
    } else acc.expected
  }

}

演员是:

import akka.actor.{Actor, ActorRef, Props}
import akka.event.Logging
import akka.util.ByteString
import com.zeptolab.tlc.front.codecs.{Accumulator, Varint32ProtoCodec}
import com.zeptolab.tlc.proto.protocol.{Downstream, Upstream}

object FrameCodec {
  def props() = Props[FrameCodec]
}

class FrameCodec extends Actor {

  import akka.io.Tcp._

  private val logger       = Logging(context.system, this)
  private val codec        = Varint32ProtoCodec[Upstream, Downstream](Upstream)
  private val sessionActor = context.actorOf(Session.props())

  def receive = {
    case r: Received =>
      context become stream(sender(), codec.emptyAcc)
      self ! r
    case PeerClosed => peerClosed()
  }

  private def stream(ioActor: ActorRef, acc: Accumulator): Receive = {
    case Received(data) =>
      val (next, output) = codec.decodeStream(data.toArray).run(acc)
      output.foreach { up =>
        sessionActor ! up
      }
      context become stream(ioActor, next)
    case d: Downstream =>
      val buffer = codec.encode(d)
      ioActor ! Write(ByteString(buffer))
    case PeerClosed => peerClosed()
  }

  private def peerClosed() = {
    logger.info("Connection closed")
    context stop self
  }

}