Akka actor - 发件人指向死信
Akka actor - sender points to dead letters
考虑以下示例:
case class Payload(message: String, async: Boolean)
class EchoActor extends Actor {
override def receive: Receive = {
case Payload(message, async) =>
if (async) Future {
println(s"from: $sender")
sender ! message
} else {
println(s"from: $sender")
sender ! message
}
}
}
def main(args: Array[String]): Unit = {
val system = ActorSystem("demo")
val echo = system.actorOf(Props[EchoActor])
implicit val timeout = Timeout(2 seconds)
(echo ? Payload("Hello", async = false)).mapTo[String].foreach(println(_))
(echo ? Payload("Async Hello", async = true)).mapTo[String].foreach(println(_))
StdIn.readLine()
system.terminate()
}
控制台输出:
from: Actor[akka://demo/temp/$a]
Hello
from: Actor[akka://demo/deadLetters]
[INFO] [04/13/2017 19:56:58.516] [demo-akka.actor.default-dispatcher-4] [akka://demo/deadLetters] Message [java.lang.String] from Actor[akka://demo/user/$a#2112869650] to Actor[akka://demo/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
即从另一个线程访问它时,发件人指向 deadLetters
。
这背后的原因是什么?
这是一个错误吗?
不过,我们可以保留对实际发件人的引用以使其工作:
if (async) {
val currentSender = sender()
Future {
println(s"from: $currentSender")
currentSender ! message
}
}
但是...没有更好的方法吗?
这不是错误,而是记录在案的行为 -
http://doc.akka.io/docs/akka/2.5.0/scala/actors.html#Send_messages
使用 Future
意味着调用不是 Actor
class 实例的匿名函数,因此您的 sender()
ref 被映射到 deadLetters
邮箱
更好的方法是 pipe
模式。
import akka.pattern.pipe
class EchoActor extends Actor {
override def receive: Receive = {
case Payload(message, async) =>
if (async) {
Future {
message
}.pipeTo(sender)
} else {
sender ! message
}
}
}
问题是 sender
是一个函数,它的值只有在 在处理传入消息的同一线程 上调用时才有效。当您在未来调用 sender
时,它是从另一个线程和另一个时间点调用的,特别是在 actor 的接收函数已经返回之后。
pipeTo
在接收函数返回之前,在与 actor 相同的线程上捕获当前发送者的值。这实际上与您使用 currentSender
值的方法相同。
考虑以下示例:
case class Payload(message: String, async: Boolean)
class EchoActor extends Actor {
override def receive: Receive = {
case Payload(message, async) =>
if (async) Future {
println(s"from: $sender")
sender ! message
} else {
println(s"from: $sender")
sender ! message
}
}
}
def main(args: Array[String]): Unit = {
val system = ActorSystem("demo")
val echo = system.actorOf(Props[EchoActor])
implicit val timeout = Timeout(2 seconds)
(echo ? Payload("Hello", async = false)).mapTo[String].foreach(println(_))
(echo ? Payload("Async Hello", async = true)).mapTo[String].foreach(println(_))
StdIn.readLine()
system.terminate()
}
控制台输出:
from: Actor[akka://demo/temp/$a]
Hello
from: Actor[akka://demo/deadLetters]
[INFO] [04/13/2017 19:56:58.516] [demo-akka.actor.default-dispatcher-4] [akka://demo/deadLetters] Message [java.lang.String] from Actor[akka://demo/user/$a#2112869650] to Actor[akka://demo/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
即从另一个线程访问它时,发件人指向 deadLetters
。
这背后的原因是什么? 这是一个错误吗?
不过,我们可以保留对实际发件人的引用以使其工作:
if (async) {
val currentSender = sender()
Future {
println(s"from: $currentSender")
currentSender ! message
}
}
但是...没有更好的方法吗?
这不是错误,而是记录在案的行为 -
http://doc.akka.io/docs/akka/2.5.0/scala/actors.html#Send_messages
使用 Future
意味着调用不是 Actor
class 实例的匿名函数,因此您的 sender()
ref 被映射到 deadLetters
邮箱
更好的方法是 pipe
模式。
import akka.pattern.pipe
class EchoActor extends Actor {
override def receive: Receive = {
case Payload(message, async) =>
if (async) {
Future {
message
}.pipeTo(sender)
} else {
sender ! message
}
}
}
问题是 sender
是一个函数,它的值只有在 在处理传入消息的同一线程 上调用时才有效。当您在未来调用 sender
时,它是从另一个线程和另一个时间点调用的,特别是在 actor 的接收函数已经返回之后。
pipeTo
在接收函数返回之前,在与 actor 相同的线程上捕获当前发送者的值。这实际上与您使用 currentSender
值的方法相同。