Akka round-robin:从远程路由发送响应给发送者
Akka round-robin: Sending response from remote routees to sender
我正在使用 Akka Cluster(版本 2.4.10),其中少数节点指定为 "front-end" 角色,少数其他节点指定为 "workers"。工人在远程机器上。传入的工作由前端参与者通过循环路由分配给工作人员。问题是将 "workers" 的响应发回给前端参与者。我可以看到工作正在由工人完成。但是 workers 发送到前端的消息没有到达并最终成为死信。我在日志中看到以下错误。
[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered.
我已经看到 this and I am following the same in my code. I have also seen this,但建议的解决方案不适用于这种情况,因为我事先不知道路由。它来自配置并且可以更改。循环路由器配置如下。
akka.actor.deployment {
/frontEnd/hm = {
router = round-robin-group
nr-of-instances = 5
routees.paths = ["/user/hmWorker"]
cluster {
enabled = on
use-role = backend
allow-local-routees = on
}
}
}
路由器在前端actor中实例化如下。
val router = context.actorOf(FromConfig.props(), name = "hm")
val controller = context.actorOf(Props(classOf[Controller], router))
控制器和工人代码如下。
// Node 1 : Controller routes requests using round-robin
class Controller(router: ActorRef) extends Actor {
val list = List("a", "b") // Assume this is a big list
val groups = list.grouped(500)
override def receive: Actor.Receive = {
val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]]))
val future = Future.sequence(futures).map(_.flatten)
val result = Await.result(future, 50 seconds)
println(s"Result is $result")
}
}
// Node 2
class Worker extends Actor {
override def receive: Actor.Receive = {
case Message(lst) =>
val future: Future[List[String]] = // Do Something asynchronous
future onComplete {
case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor.
case Failure(th) => // Error handling
}
}
}
请让我知道我在这里做错了什么。感谢您的帮助。
您不应在 Future
的回调中使用 sender()
。处理回调时,sender()
可能指的是与您收到消息时不同的内容。
考虑先在回调之外保存引用,例如:
override def receive: Actor.Receive = {
case Message(lst) =>
val future: Future[List[String]] = // Do Something asynchronous
val replyTo: ActorRef = sender()
future onComplete {
case Success(r) => replyTo.!(r)(context.parent) // This message is not delivered to Controller actor.
case Failure(th) => // Error handling
}
}
或者更好,使用管道模式:
import akka.pattern.pipe
override def receive: Actor.Receive = {
case Message(lst) =>
val future: Future[List[String]] = // Do Something asynchronous
future.pipeTo(sender())
}
我正在使用 Akka Cluster(版本 2.4.10),其中少数节点指定为 "front-end" 角色,少数其他节点指定为 "workers"。工人在远程机器上。传入的工作由前端参与者通过循环路由分配给工作人员。问题是将 "workers" 的响应发回给前端参与者。我可以看到工作正在由工人完成。但是 workers 发送到前端的消息没有到达并最终成为死信。我在日志中看到以下错误。
[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered.
我已经看到 this and I am following the same in my code. I have also seen this,但建议的解决方案不适用于这种情况,因为我事先不知道路由。它来自配置并且可以更改。循环路由器配置如下。
akka.actor.deployment {
/frontEnd/hm = {
router = round-robin-group
nr-of-instances = 5
routees.paths = ["/user/hmWorker"]
cluster {
enabled = on
use-role = backend
allow-local-routees = on
}
}
}
路由器在前端actor中实例化如下。
val router = context.actorOf(FromConfig.props(), name = "hm")
val controller = context.actorOf(Props(classOf[Controller], router))
控制器和工人代码如下。
// Node 1 : Controller routes requests using round-robin
class Controller(router: ActorRef) extends Actor {
val list = List("a", "b") // Assume this is a big list
val groups = list.grouped(500)
override def receive: Actor.Receive = {
val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]]))
val future = Future.sequence(futures).map(_.flatten)
val result = Await.result(future, 50 seconds)
println(s"Result is $result")
}
}
// Node 2
class Worker extends Actor {
override def receive: Actor.Receive = {
case Message(lst) =>
val future: Future[List[String]] = // Do Something asynchronous
future onComplete {
case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor.
case Failure(th) => // Error handling
}
}
}
请让我知道我在这里做错了什么。感谢您的帮助。
您不应在 Future
的回调中使用 sender()
。处理回调时,sender()
可能指的是与您收到消息时不同的内容。
考虑先在回调之外保存引用,例如:
override def receive: Actor.Receive = {
case Message(lst) =>
val future: Future[List[String]] = // Do Something asynchronous
val replyTo: ActorRef = sender()
future onComplete {
case Success(r) => replyTo.!(r)(context.parent) // This message is not delivered to Controller actor.
case Failure(th) => // Error handling
}
}
或者更好,使用管道模式:
import akka.pattern.pipe
override def receive: Actor.Receive = {
case Message(lst) =>
val future: Future[List[String]] = // Do Something asynchronous
future.pipeTo(sender())
}