在两个演员之间使用 pipeTo 时得到 akka 死信

getting akka dead letters when using pipeTo between two actors

我有一个用例,其中我有一个参与者层次结构

parent -> childABC -> workerchild

现在工作人员 child 工作并将其结果发送到其 parent(childABC,它是 parent 的 child)并且 child actor(childABC) 将结果发送回 parent actor 我正在使用 pipeTo 并在此处获取死信是我的代码

parent演员:

final case object GetFinalValue

class MyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetFinalValue=>
      ask(myManageActor,GetValue).pipeTo(sender())

    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

childABC(根据我上面给出的例子)

final case object GetValue

class ManagerMyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetValue=>
      ask(myTokenActor,CalculateValue).pipeTo(sender())
 
    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

child演员:

final case object CalculateValue

class TokenMyActor2 extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)

  override def receive: Receive = {
    case CalculateValue=>
      val future = Future{ "get the string"
      }
      val bac = future.map{result =>
          sender ! result
      }//.pipeTo(sender())


    case message =>
      log.warn("Actor MyActor: Unhandled message received : {}", message)
      unhandled(message)
  }

}


def main(args: Array[String]): Unit = {
    implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

    val myActor = system.actorOf(Props[MyActor],"myActor")
    val future = ask(myActor, GetFinalValue).mapTo[String]
    future.map {str =>
      log.info ("string is {}",str)
    }

这是日志:

[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

请指导我哪里错了,或者pipeTo不应该这样使用?如果是这样,我应该怎么做才能让它发挥作用

不确定是否有意但 ask(myManageActor,GetValue).pipeTo(sender()) 可以实现为 forward

class MyActor extends Actor {
  lazy val myManageActor: ActorRef = ???

  override def receive: Receive = {
    case GetFinalValue =>
      myManageActor.forward(GetValue)
  }
}

forwardtell 相同,但它保留邮件的原始发件人。

这可以应用于 MyActorManagerMyActor

TokenMyActor2的情况下,你不应该使用

future.map{ result =>
          sender ! result
      }

因为它破坏了 akka 上下文封装,如 docs

中所指定

When using future callbacks, such as onComplete, or map such as thenRun, or thenApply inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. See also: Actors and shared mutable state

您应该改为依赖 Future(???).pipeTo(sender()),它可以安全地与 sender() 一起使用。

应用这些更改后,代码确实按预期工作

case object GetFinalValue
case object GetValue
case object CalculateValue

class MyActor extends Actor {
  private val myManageActor: ActorRef =
    context.actorOf(Props[ManagerMyActor], "myManageActor")

  override def receive: Receive = { case GetFinalValue =>
    myManageActor.forward(GetValue)
  }
}

class ManagerMyActor extends Actor {
  private val myTokenActor =
    context.actorOf(Props[TokenMyActor2], "toknMyActor2")

  override def receive: Receive = { case GetValue =>
    myTokenActor.forward(CalculateValue)
  }

}

class TokenMyActor2 extends Actor {
  import context.dispatcher

  override def receive: Receive = { case CalculateValue =>
    val future = Future { "get the string" }
    future.pipeTo(sender())
  }
}
implicit val timeout = Timeout(3, SECONDS)
implicit val system = ActorSystem("adasd")
import system.dispatcher
val myActor = system.actorOf(Props[MyActor], "myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.foreach { str =>
  println(s"got $str")
}

产生 got get the string.

最后一点,我建议不要在演员中使用 ask 模式。只需 tellforward 即可轻松实现 ask 的基本功能。此外,代码更短,不会因 implicit val timeout

的不断需要而超载

只是在@IvanStanislavciuc 的伟大 post 之上添加。您已经注意到您在 futures 中丢失了对发件人的引用。一个简单的解决方案是将它放在前面。

表示在MyActor变化:

ask(myManageActor,GetValue).pipeTo(sender()) // Won't work

进入:

val originalSender = sender()
ask(myTokenActor,CalculateValue).pipeTo(originalSender)

ManagerMyActor中,更改:

ask(myTokenActor,CalculateValue).pipeTo(sender()) // Won't work

进入:

val originalSender = sender()
ask(myManageActor,GetValue).pipeTo(originalSender)

TokenMyActor2中:

val originalSender = sender()
Future{ "get the string" }.pipeTo(originalSender)

代码 运行 在 Scastie