Dead Letters 使用 Akka 创建消息环

Dead Letters using Akka to create a message ring

我正在尝试使用远程参与者创建示例 Akka 应用程序。目标是创建例如 16 个按顺序交换消息的 actor(actor 16 与 actor 15、15 至 14 等对话,1 与 actor 16 对话)。但是,我在通信方面遇到了问题,因为我不断遇到此错误。

[INFO] [05/04/2017 15:45:58.248] [ActorFlasks-akka.actor.default-dispatcher-4] [akka://ActorFlasks/deadLetters] Message [java.lang.String] from Actor[akka://ActorFlasks/user/16#-2022012132] to Actor[akka://ActorFlasks/deadLetters] was not delivered. [1] dead letters encountered.

为此,我 运行 应用程序的 16 个终端实例,始终具有不同的配置文件。我在每个实例中都创建了 actorsystem,如下所示:

object Main extends App {

    val localId = args(0)

    val configFile = getClass.getClassLoader.getResource(s"application$localId.conf").getFile
    val config = ConfigFactory.parseFile(new File(configFile))
    val system = ActorSystem("ActorFlasks" , config)
    val remote = system.actorOf(Props[CyclonManager], name=localId)

    remote ! "START"
}

一个配置文件的例子是这样的:

akka {
  actor {
    provider = remote
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "localhost"
      port = 50001
    }
 }
}

演员定义如下:

class CyclonManager extends Actor {

  def propagateMessage(): Unit = {
    val localId = self.path.name.toInt
    val currentPort = 50000 + localId
    val nextHopPort = if (currentPort == 50001) 50016 else currentPort - 1
    val nextHopId = localId-1

    val nextHopRef = context.actorSelection(s"akka.tcp://ActorFlasks@localhost:$nextHopPort/user/$nextHopId")

    nextHopRef ! "NEXT"
  }

  override def receive: Receive = {
    case "START" =>
      if (self.path.name == "16") {
        propagateMessage()
      }
    case "NEXT" =>
      propagateMessage()
    case _ =>
      println("Unrecognized message")
  }
}

这是一个让我入门的简单示例,但无论我如何尝试,它都无法正常工作。有人知道我哪里失败了吗?

提前谢谢你,

编辑:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "localhost"
      port = 50015
    }
 }
}

重构并运行使用您的示例后,我在 propagateMessage 函数中发现了一个错误。

val nextHopId = localId-1

应该是

val nextHopId = if (currentPort == 50001) 16 else localId-1

如果它不能解决您的问题,请尝试 运行宁我的快速而肮脏但有效的代码,看看它与您的代码有何不同:https://gist.github.com/grantzvolsky/4a53ce78610038a9d44788d7151dc416

在我的代码中,我只使用了演员 14、15 和 16。您可以 运行 每个都使用 sbt "run 16",等等