在两个演员之间使用 pipeTo 时得到 akka 死信
getting akka dead letters when using pipeTo between two actors
我有一个用例,其中我有一个参与者层次结构
parent -> childABC -> workerchild
现在工作人员 child 工作并将其结果发送到其 parent(childABC,它是 parent 的 child)并且 child actor(childABC) 将结果发送回 parent actor 我正在使用 pipeTo
并在此处获取死信是我的代码
parent
演员:
final case object GetFinalValue
class MyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetFinalValue=>
ask(myManageActor,GetValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
childABC
(根据我上面给出的例子)
final case object GetValue
class ManagerMyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetValue=>
ask(myTokenActor,CalculateValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
child
演员:
final case object CalculateValue
class TokenMyActor2 extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
override def receive: Receive = {
case CalculateValue=>
val future = Future{ "get the string"
}
val bac = future.map{result =>
sender ! result
}//.pipeTo(sender())
case message =>
log.warn("Actor MyActor: Unhandled message received : {}", message)
unhandled(message)
}
}
def main(args: Array[String]): Unit = {
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
val myActor = system.actorOf(Props[MyActor],"myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.map {str =>
log.info ("string is {}",str)
}
这是日志:
[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
请指导我哪里错了,或者pipeTo
不应该这样使用?如果是这样,我应该怎么做才能让它发挥作用
不确定是否有意但 ask(myManageActor,GetValue).pipeTo(sender())
可以实现为 forward
。
class MyActor extends Actor {
lazy val myManageActor: ActorRef = ???
override def receive: Receive = {
case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
forward
与 tell
相同,但它保留邮件的原始发件人。
这可以应用于 MyActor
和 ManagerMyActor
。
在TokenMyActor2
的情况下,你不应该使用
future.map{ result =>
sender ! result
}
因为它破坏了 akka 上下文封装,如 docs
中所指定
When using future callbacks, such as onComplete, or map such as
thenRun, or thenApply inside actors you need to carefully avoid
closing over the containing actor’s reference, i.e. do not call
methods or access mutable state on the enclosing actor from within the
callback. This would break the actor encapsulation and may introduce
synchronization bugs and race conditions because the callback will be
scheduled concurrently to the enclosing actor. Unfortunately there is
not yet a way to detect these illegal accesses at compile time. See
also: Actors and shared mutable state
您应该改为依赖 Future(???).pipeTo(sender())
,它可以安全地与 sender()
一起使用。
应用这些更改后,代码确实按预期工作
case object GetFinalValue
case object GetValue
case object CalculateValue
class MyActor extends Actor {
private val myManageActor: ActorRef =
context.actorOf(Props[ManagerMyActor], "myManageActor")
override def receive: Receive = { case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
class ManagerMyActor extends Actor {
private val myTokenActor =
context.actorOf(Props[TokenMyActor2], "toknMyActor2")
override def receive: Receive = { case GetValue =>
myTokenActor.forward(CalculateValue)
}
}
class TokenMyActor2 extends Actor {
import context.dispatcher
override def receive: Receive = { case CalculateValue =>
val future = Future { "get the string" }
future.pipeTo(sender())
}
}
implicit val timeout = Timeout(3, SECONDS)
implicit val system = ActorSystem("adasd")
import system.dispatcher
val myActor = system.actorOf(Props[MyActor], "myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.foreach { str =>
println(s"got $str")
}
产生 got get the string
.
最后一点,我建议不要在演员中使用 ask
模式。只需 tell
和 forward
即可轻松实现 ask
的基本功能。此外,代码更短,不会因 implicit val timeout
的不断需要而超载
只是在@IvanStanislavciuc 的伟大 post 之上添加。您已经注意到您在 futures 中丢失了对发件人的引用。一个简单的解决方案是将它放在前面。
表示在MyActor
变化:
ask(myManageActor,GetValue).pipeTo(sender()) // Won't work
进入:
val originalSender = sender()
ask(myTokenActor,CalculateValue).pipeTo(originalSender)
在ManagerMyActor
中,更改:
ask(myTokenActor,CalculateValue).pipeTo(sender()) // Won't work
进入:
val originalSender = sender()
ask(myManageActor,GetValue).pipeTo(originalSender)
在TokenMyActor2
中:
val originalSender = sender()
Future{ "get the string" }.pipeTo(originalSender)
代码 运行 在 Scastie。
我有一个用例,其中我有一个参与者层次结构
parent -> childABC -> workerchild
现在工作人员 child 工作并将其结果发送到其 parent(childABC,它是 parent 的 child)并且 child actor(childABC) 将结果发送回 parent actor 我正在使用 pipeTo
并在此处获取死信是我的代码
parent
演员:
final case object GetFinalValue
class MyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetFinalValue=>
ask(myManageActor,GetValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
childABC
(根据我上面给出的例子)
final case object GetValue
class ManagerMyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetValue=>
ask(myTokenActor,CalculateValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
child
演员:
final case object CalculateValue
class TokenMyActor2 extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
override def receive: Receive = {
case CalculateValue=>
val future = Future{ "get the string"
}
val bac = future.map{result =>
sender ! result
}//.pipeTo(sender())
case message =>
log.warn("Actor MyActor: Unhandled message received : {}", message)
unhandled(message)
}
}
def main(args: Array[String]): Unit = {
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
val myActor = system.actorOf(Props[MyActor],"myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.map {str =>
log.info ("string is {}",str)
}
这是日志:
[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
请指导我哪里错了,或者pipeTo
不应该这样使用?如果是这样,我应该怎么做才能让它发挥作用
不确定是否有意但 ask(myManageActor,GetValue).pipeTo(sender())
可以实现为 forward
。
class MyActor extends Actor {
lazy val myManageActor: ActorRef = ???
override def receive: Receive = {
case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
forward
与 tell
相同,但它保留邮件的原始发件人。
这可以应用于 MyActor
和 ManagerMyActor
。
在TokenMyActor2
的情况下,你不应该使用
future.map{ result =>
sender ! result
}
因为它破坏了 akka 上下文封装,如 docs
中所指定When using future callbacks, such as onComplete, or map such as thenRun, or thenApply inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. See also: Actors and shared mutable state
您应该改为依赖 Future(???).pipeTo(sender())
,它可以安全地与 sender()
一起使用。
应用这些更改后,代码确实按预期工作
case object GetFinalValue
case object GetValue
case object CalculateValue
class MyActor extends Actor {
private val myManageActor: ActorRef =
context.actorOf(Props[ManagerMyActor], "myManageActor")
override def receive: Receive = { case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
class ManagerMyActor extends Actor {
private val myTokenActor =
context.actorOf(Props[TokenMyActor2], "toknMyActor2")
override def receive: Receive = { case GetValue =>
myTokenActor.forward(CalculateValue)
}
}
class TokenMyActor2 extends Actor {
import context.dispatcher
override def receive: Receive = { case CalculateValue =>
val future = Future { "get the string" }
future.pipeTo(sender())
}
}
implicit val timeout = Timeout(3, SECONDS)
implicit val system = ActorSystem("adasd")
import system.dispatcher
val myActor = system.actorOf(Props[MyActor], "myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.foreach { str =>
println(s"got $str")
}
产生 got get the string
.
最后一点,我建议不要在演员中使用 ask
模式。只需 tell
和 forward
即可轻松实现 ask
的基本功能。此外,代码更短,不会因 implicit val timeout
只是在@IvanStanislavciuc 的伟大 post 之上添加。您已经注意到您在 futures 中丢失了对发件人的引用。一个简单的解决方案是将它放在前面。
表示在MyActor
变化:
ask(myManageActor,GetValue).pipeTo(sender()) // Won't work
进入:
val originalSender = sender()
ask(myTokenActor,CalculateValue).pipeTo(originalSender)
在ManagerMyActor
中,更改:
ask(myTokenActor,CalculateValue).pipeTo(sender()) // Won't work
进入:
val originalSender = sender()
ask(myManageActor,GetValue).pipeTo(originalSender)
在TokenMyActor2
中:
val originalSender = sender()
Future{ "get the string" }.pipeTo(originalSender)
代码 运行 在 Scastie。