Dead Letters 使用 Akka 创建消息环
Dead Letters using Akka to create a message ring
我正在尝试使用远程参与者创建示例 Akka 应用程序。目标是创建例如 16 个按顺序交换消息的 actor(actor 16 与 actor 15、15 至 14 等对话,1 与 actor 16 对话)。但是,我在通信方面遇到了问题,因为我不断遇到此错误。
[INFO] [05/04/2017 15:45:58.248]
[ActorFlasks-akka.actor.default-dispatcher-4]
[akka://ActorFlasks/deadLetters] Message [java.lang.String] from
Actor[akka://ActorFlasks/user/16#-2022012132] to
Actor[akka://ActorFlasks/deadLetters] was not delivered. [1] dead
letters encountered.
为此,我 运行 应用程序的 16 个终端实例,始终具有不同的配置文件。我在每个实例中都创建了 actorsystem,如下所示:
object Main extends App {
val localId = args(0)
val configFile = getClass.getClassLoader.getResource(s"application$localId.conf").getFile
val config = ConfigFactory.parseFile(new File(configFile))
val system = ActorSystem("ActorFlasks" , config)
val remote = system.actorOf(Props[CyclonManager], name=localId)
remote ! "START"
}
一个配置文件的例子是这样的:
akka {
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 50001
}
}
}
演员定义如下:
class CyclonManager extends Actor {
def propagateMessage(): Unit = {
val localId = self.path.name.toInt
val currentPort = 50000 + localId
val nextHopPort = if (currentPort == 50001) 50016 else currentPort - 1
val nextHopId = localId-1
val nextHopRef = context.actorSelection(s"akka.tcp://ActorFlasks@localhost:$nextHopPort/user/$nextHopId")
nextHopRef ! "NEXT"
}
override def receive: Receive = {
case "START" =>
if (self.path.name == "16") {
propagateMessage()
}
case "NEXT" =>
propagateMessage()
case _ =>
println("Unrecognized message")
}
}
这是一个让我入门的简单示例,但无论我如何尝试,它都无法正常工作。有人知道我哪里失败了吗?
提前谢谢你,
编辑:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 50015
}
}
}
重构并运行使用您的示例后,我在 propagateMessage 函数中发现了一个错误。
val nextHopId = localId-1
应该是
val nextHopId = if (currentPort == 50001) 16 else localId-1
如果它不能解决您的问题,请尝试 运行宁我的快速而肮脏但有效的代码,看看它与您的代码有何不同:https://gist.github.com/grantzvolsky/4a53ce78610038a9d44788d7151dc416
在我的代码中,我只使用了演员 14、15 和 16。您可以 运行 每个都使用 sbt "run 16"
,等等
我正在尝试使用远程参与者创建示例 Akka 应用程序。目标是创建例如 16 个按顺序交换消息的 actor(actor 16 与 actor 15、15 至 14 等对话,1 与 actor 16 对话)。但是,我在通信方面遇到了问题,因为我不断遇到此错误。
[INFO] [05/04/2017 15:45:58.248] [ActorFlasks-akka.actor.default-dispatcher-4] [akka://ActorFlasks/deadLetters] Message [java.lang.String] from Actor[akka://ActorFlasks/user/16#-2022012132] to Actor[akka://ActorFlasks/deadLetters] was not delivered. [1] dead letters encountered.
为此,我 运行 应用程序的 16 个终端实例,始终具有不同的配置文件。我在每个实例中都创建了 actorsystem,如下所示:
object Main extends App {
val localId = args(0)
val configFile = getClass.getClassLoader.getResource(s"application$localId.conf").getFile
val config = ConfigFactory.parseFile(new File(configFile))
val system = ActorSystem("ActorFlasks" , config)
val remote = system.actorOf(Props[CyclonManager], name=localId)
remote ! "START"
}
一个配置文件的例子是这样的:
akka {
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 50001
}
}
}
演员定义如下:
class CyclonManager extends Actor {
def propagateMessage(): Unit = {
val localId = self.path.name.toInt
val currentPort = 50000 + localId
val nextHopPort = if (currentPort == 50001) 50016 else currentPort - 1
val nextHopId = localId-1
val nextHopRef = context.actorSelection(s"akka.tcp://ActorFlasks@localhost:$nextHopPort/user/$nextHopId")
nextHopRef ! "NEXT"
}
override def receive: Receive = {
case "START" =>
if (self.path.name == "16") {
propagateMessage()
}
case "NEXT" =>
propagateMessage()
case _ =>
println("Unrecognized message")
}
}
这是一个让我入门的简单示例,但无论我如何尝试,它都无法正常工作。有人知道我哪里失败了吗?
提前谢谢你,
编辑:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 50015
}
}
}
重构并运行使用您的示例后,我在 propagateMessage 函数中发现了一个错误。
val nextHopId = localId-1
应该是
val nextHopId = if (currentPort == 50001) 16 else localId-1
如果它不能解决您的问题,请尝试 运行宁我的快速而肮脏但有效的代码,看看它与您的代码有何不同:https://gist.github.com/grantzvolsky/4a53ce78610038a9d44788d7151dc416
在我的代码中,我只使用了演员 14、15 和 16。您可以 运行 每个都使用 sbt "run 16"
,等等