使用 Akka Actors 处理多个 TCP 连接

Handling multiple TCP connections with Akka Actors

我正在尝试使用 akka 参与者设置一个简单的 TCP 服务器,它应该允许多个客户端同时连接。我将我的问题简化为以下简单程序:

package actorfail
import akka.actor._, akka.io._, akka.util._
import scala.collection.mutable._
import java.net._

case class Foo()

class ConnHandler(conn: ActorRef) extends Actor {
  def receive = {
    case Foo() => conn ! Tcp.Write(ByteString("foo\n"))
  }
}

class Server(conns: ArrayBuffer[ActorRef]) extends Actor {
  import context.system
  println("Listing on 127.0.0.1:9191")
  IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("127.0.0.1", 9191))
  def receive = {
    case Tcp.Connected(remote, local) =>
      val handler = context.actorOf(Props(new ConnHandler(sender)))
      sender ! Tcp.Register(handler)
      conns.append(handler)
  }
}

object Main {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("Test")
    val conns = new ArrayBuffer[ActorRef]()
    val server = system.actorOf(Props(new Server(conns)))
    while (true)  {
      println(s"Sending some foos")
      for (c <- conns) c ! Foo()
      Thread.sleep(1000)
    }
  }
}

它绑定到 localhost:9191 并接受多个连接,将连接处理程序添加到全局数组并定期向每个连接发送字符串 "foo"。现在,当我尝试同时连接多个客户端时,只有第一个客户端获得 "foo"s。当我打开第二个连接时,它没有发送任何 foo,而是收到以下类型的日志消息:

Sending some foos
[INFO] [03/27/2015 21:24:07.331] [Test-akka.actor.default-dispatcher-6] [akka://Test/deadLetters] Message [akka.io.Tcp$Write] from Actor[akka://Test/user/$a/$b#-308726290] to Actor[akka://Test/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

我知道这意味着我们尝试向其发送 Tcp.Write 命令的目标参与者不再接受消息。但这是为什么呢?你能帮我理解根本问题吗?我怎样才能使这项工作?

上面的代码有两个问题:

  • 在 actor 消息中发送可变状态并以非线程安全的方式改变它
  • 在 Props 中包含不稳定的引用

在我详细说明之前,请考虑阅读文档,here and here,这些都包含在那里。

可变消息

ArrayBuffer 不是线程安全的,但是您将它从主例程传递给不同的参与者,然后他们独立(同时)修改它。这将导致更新丢失或数据结构本身损坏。另一方面,如果没有适当的同步,则不能保证主线程会看到修改,因为编译器原则上可以确定缓冲区在 while 循环内不会发生变化,并相应地优化代码。

Actor 不依赖于共享的可变状态,而是只发送消息。在这种情况下,解决方案是将 while 循环提升到一个参与者中(但在一秒钟后安排消息到 self 而不是阻塞 Thread.sleep(1000) 调用)。然后连接处理程序只需要为这个 foo 发送者 actor 传递 ActorRef,他们会向它发送一条消息来注册自己,然后该 actor 将活动连接列表保存在它封装的范围内。这样做的好处是您可以使用 DeathWatch 在连接终止时删除连接。

道具中的不稳定引用

有问题的代码:Props(new ConnHandler(sender))

Props 是从一个 actor 工厂构造的,在这种情况下它被当作一个名字参数;整个 new 表达式将在稍后评估,每当这样的 actor 被初始化时——可能在不同的线程上。这意味着 sender 也会在稍后从该执行上下文中评估,因此它可能是 deadLetters(如果父 actor 当前不是 运行——如果是,sender 可能会完全指向错误的演员)。

此处的解决方案已记录 here