当 actor 包含异步方法时,这将导致死信错误和 Ask 超时异常

when an actor contains an async method, this will lead to dead letter error with the Ask time out exception

我使用询问模式在名为 fun-F 的函数中向名为 actor-A 的参与者发送请求。 actor 将获得另一个系统以异步方式生成的 ID,完成后,我会将包含此 ID 的消息转发给另一个名为 actor-B 的 actor,actor-B 将执行一些数据库操作,然后发回DB 操作会向发送者发送一条消息,因为在我的例子中我使用转发模式,所以 actor-B 将发送者识别为 fun-F,akka 会给 fun-F 一个临时的 actor 名称,所以返回值应该交付给临时演员。

我的问题是:

如果我使用sync-method从另一个系统获取ID,然后将这个消息转发给actor-B,actor-B的DB操作后,结果可以传递给fun-F的值,并且fun-F 被 akka 框架运行时定义为临时演员 Actor[akka://ai-feedback-service/temp/$b]。

如果我使用异步方法从另一个系统获取ID,当它完成时,我将在另一个回调线程的oncompleted {}代码块中转发消息,处理actor-B中的DB操作成功,但是返回值无法传递到fun-F中定义的值,此时fun-F被akka framwork runtime定义为Actor[akka://ai-feedback-service/deadLetters]。所以 actor-B 迷路了,不知道如何返回或应该将此消息传递到哪里,这将导致我的日志中抛出 Ask time out 异常。

我该如何处理这个问题?或者我怎样才能避免这个死信请求超时异常?

下面是我的代码:

// this is the so-called fun-F [createFeedback]
def createFeedback(query: String, 
                   response: String, 
                   userId: Long, 
                   userAgent: String, 
                   requestId: Long, 
                   errType: Short, 
                   memo: String): Future[java.lang.Long] = {
    val ticket = Ticket(userId,
                        requestId,
                        query,
                        response,
                        errType,
                        userAgent,
                        memo)
    val issueId = (jiraActor ? CreateJiraTicketSignal(ticket))
                  .mapTo[CreateFeedbackResponseSignal].map{ r =>
        r.issueId.asInstanceOf[java.lang.Long]
    }
    issueId
}


//this is the so-called actor-A [jiraActor]
//receive method are run in its parent actor for some authorization
//in this actor only override the handleActorMsg method to deal msg
override def handleActorMsg(msg: ActorMsgSignal): Unit = {
    msg match {
        case s:CreateJiraTicketSignal =>
            val issueId = createIssue(cookieCache.cookieContext.flag,
                                     cookieCache.cookieContext.cookie,
                                     s.ticket)
            println(s">> ${sender()} before map $issueId")
            issueId.map{
                case(id:Long) =>
                    println(s">> again++issueId = $id ${id.getClass}")
                    println(s">>> $self / ${sender()}")
                    println("again ++ jira action finished")
                    dbActor.forward(CreateFeedbackSignal(id,s.ticket))
                case(message:String) if(!s.retry) =>
                    self ! CreateJiraTicketSignal(s.ticket,true)
                case(message:String) if(s.retry) =>
                    log.error("cannot create ticket :" + message)
            }
            println(s">> after map $issueId")
}


//this is the so-called actor-B [dbActor]
override def receive: Receive = {
    case CreateFeedbackSignal(issueId:Long, ticket:Ticket) =>
        val timestampTicks = System.currentTimeMillis()
        val description: String = Json.obj("question" -> ticket.query, 
                                          "answer" -> ticket.response)
                                          .toString()
        dao.createFeedback(issueId,
                           ticket.usrId.toString,
                           description,
                           FeedbackStatus.Open.getValue
                                .asInstanceOf[Byte],
                           new Timestamp(timestampTicks),
                           new Timestamp(timestampTicks),
                           ticket.usrAgent,
                           ticket.errType,
                           ticket.memo)

        println(s">> sender = ${sender()}")
        sender() ! (CreateFeedbackResponseSignal(issueId))
        println("db issue id is " + issueId)
        println("db action finished")
}

要避免死信问题,请执行以下操作:

  1. 对于每个请求,使用一个可以与请求的最终目标相关联的标识符(可能是 requestId)。也就是说,将要传递给 createFeedback 方法的 requestId 绑定到该方法的调用者 (ActorRef),然后通过消息链传递此 ID。您可以使用地图来保存这些关联。

    • 更改 CreateFeedbackResponseSignal(issueId) 以包含 Ticket class 中的 requestIdCreateFeedbackResponseSignal(requestId, issueId)
  2. 当从 actor 内部处理 Future 的异步结果时,pipe Future 的结果到 self 而不是使用回调。

    • 使用这种方法,createIssue 的结果将在结果可用时发送到 jiraActorjiraActor 然后将该结果发送到 dbActor
    • jiraActor 将是 dbActor 中的 sender。当 jiraActor 收到来自 dbActor 的结果时,jiraActor 可以在其内部映射中查找对目标的引用。

下面是一个模拟您的用例并可在 ScalaFiddle 中运行的简单示例:

import akka.actor._
import akka.pattern.{ask, pipe}
import akka.util.Timeout

import language.postfixOps

import scala.concurrent._
import scala.concurrent.duration._

case class Signal(requestId: Long)
case class ResponseSignal(requestId: Long, issueId: Long)

object ActorA {
  def props(actorB: ActorRef) = Props(new ActorA(actorB))
}

class ActorA(dbActor: ActorRef) extends Actor {
  import context.dispatcher

  var targets: Map[Long, ActorRef] = Map.empty

  def receive = {
    case Signal(requestId) =>
      val s = sender
      targets = targets + (requestId -> s)
      createIssue(requestId).mapTo[Tuple2[Long, Long]].pipeTo(self) // <-- use pipeTo
    case ids: Tuple2[Long, Long] =>
      println(s"Sending $ids to dbActor")
      dbActor ! ids
    case r: ResponseSignal =>
      println(s"Received from dbActor: $r")
      val target = targets.get(r.requestId)
      println(s"In actorA, sending to: $target")
      target.foreach(_ ! r)
      targets = targets - r.requestId
  }
}

class DbActor extends Actor {
  def receive = {
    case (requestId: Long, issueId: Long) =>
      val response = ResponseSignal(requestId, issueId)
      println(s"In dbActor, sending $response to $sender")
      sender ! response
  }
}

val system = ActorSystem("jiratest")
implicit val ec = system.dispatcher

val dbActor = system.actorOf(Props[DbActor])
val jiraActor = system.actorOf(Props(new ActorA(dbActor)))

val requestId = 2L

def createIssue(requestId: Long): Future[(Long, Long)] = {
  println(s"Creating an issue ID for requestId[$requestId]")
  Future((requestId, 99L))
}

def createFeedback(): Future[Long] = {
  implicit val timeout = Timeout(5.seconds)
  val res = (jiraActor ? Signal(requestId)).mapTo[ResponseSignal]
  res.map(_.issueId)
}

createFeedback().onComplete { x =>
  println(s"Done: $x")
}

运行 ScalaFiddle 中的上述代码导致以下输出:

Creating an issue ID for requestId[2]
Sending (2,99) to dbActor
In dbActor, sending ResponseSignal(2,99) to Actor[akka://jiratest/user/$b#-710097339]
Received from dbActor: ResponseSignal(2,99)
In actorA, sending to: Some(Actor[akka://jiratest/temp/$a])
Done: Success(99)