当 actor 包含异步方法时,这将导致死信错误和 Ask 超时异常
when an actor contains an async method, this will lead to dead letter error with the Ask time out exception
我使用询问模式在名为 fun-F 的函数中向名为 actor-A 的参与者发送请求。 actor 将获得另一个系统以异步方式生成的 ID,完成后,我会将包含此 ID 的消息转发给另一个名为 actor-B 的 actor,actor-B 将执行一些数据库操作,然后发回DB 操作会向发送者发送一条消息,因为在我的例子中我使用转发模式,所以 actor-B 将发送者识别为 fun-F,akka 会给 fun-F 一个临时的 actor 名称,所以返回值应该交付给临时演员。
我的问题是:
如果我使用sync-method从另一个系统获取ID,然后将这个消息转发给actor-B,actor-B的DB操作后,结果可以传递给fun-F的值,并且fun-F 被 akka 框架运行时定义为临时演员 Actor[akka://ai-feedback-service/temp/$b]。
如果我使用异步方法从另一个系统获取ID,当它完成时,我将在另一个回调线程的oncompleted {}代码块中转发消息,处理actor-B中的DB操作成功,但是返回值无法传递到fun-F中定义的值,此时fun-F被akka framwork runtime定义为Actor[akka://ai-feedback-service/deadLetters]。所以 actor-B 迷路了,不知道如何返回或应该将此消息传递到哪里,这将导致我的日志中抛出 Ask time out 异常。
我该如何处理这个问题?或者我怎样才能避免这个死信请求超时异常?
下面是我的代码:
// this is the so-called fun-F [createFeedback]
def createFeedback(query: String,
response: String,
userId: Long,
userAgent: String,
requestId: Long,
errType: Short,
memo: String): Future[java.lang.Long] = {
val ticket = Ticket(userId,
requestId,
query,
response,
errType,
userAgent,
memo)
val issueId = (jiraActor ? CreateJiraTicketSignal(ticket))
.mapTo[CreateFeedbackResponseSignal].map{ r =>
r.issueId.asInstanceOf[java.lang.Long]
}
issueId
}
//this is the so-called actor-A [jiraActor]
//receive method are run in its parent actor for some authorization
//in this actor only override the handleActorMsg method to deal msg
override def handleActorMsg(msg: ActorMsgSignal): Unit = {
msg match {
case s:CreateJiraTicketSignal =>
val issueId = createIssue(cookieCache.cookieContext.flag,
cookieCache.cookieContext.cookie,
s.ticket)
println(s">> ${sender()} before map $issueId")
issueId.map{
case(id:Long) =>
println(s">> again++issueId = $id ${id.getClass}")
println(s">>> $self / ${sender()}")
println("again ++ jira action finished")
dbActor.forward(CreateFeedbackSignal(id,s.ticket))
case(message:String) if(!s.retry) =>
self ! CreateJiraTicketSignal(s.ticket,true)
case(message:String) if(s.retry) =>
log.error("cannot create ticket :" + message)
}
println(s">> after map $issueId")
}
//this is the so-called actor-B [dbActor]
override def receive: Receive = {
case CreateFeedbackSignal(issueId:Long, ticket:Ticket) =>
val timestampTicks = System.currentTimeMillis()
val description: String = Json.obj("question" -> ticket.query,
"answer" -> ticket.response)
.toString()
dao.createFeedback(issueId,
ticket.usrId.toString,
description,
FeedbackStatus.Open.getValue
.asInstanceOf[Byte],
new Timestamp(timestampTicks),
new Timestamp(timestampTicks),
ticket.usrAgent,
ticket.errType,
ticket.memo)
println(s">> sender = ${sender()}")
sender() ! (CreateFeedbackResponseSignal(issueId))
println("db issue id is " + issueId)
println("db action finished")
}
要避免死信问题,请执行以下操作:
对于每个请求,使用一个可以与请求的最终目标相关联的标识符(可能是 requestId
)。也就是说,将要传递给 createFeedback
方法的 requestId
绑定到该方法的调用者 (ActorRef
),然后通过消息链传递此 ID。您可以使用地图来保存这些关联。
- 更改
CreateFeedbackResponseSignal(issueId)
以包含 Ticket
class 中的 requestId
:CreateFeedbackResponseSignal(requestId, issueId)
。
当从 actor 内部处理 Future
的异步结果时,pipe
Future
的结果到 self
而不是使用回调。
- 使用这种方法,
createIssue
的结果将在结果可用时发送到 jiraActor
。 jiraActor
然后将该结果发送到 dbActor
。
jiraActor
将是 dbActor
中的 sender
。当 jiraActor
收到来自 dbActor
的结果时,jiraActor
可以在其内部映射中查找对目标的引用。
下面是一个模拟您的用例并可在 ScalaFiddle 中运行的简单示例:
import akka.actor._
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import language.postfixOps
import scala.concurrent._
import scala.concurrent.duration._
case class Signal(requestId: Long)
case class ResponseSignal(requestId: Long, issueId: Long)
object ActorA {
def props(actorB: ActorRef) = Props(new ActorA(actorB))
}
class ActorA(dbActor: ActorRef) extends Actor {
import context.dispatcher
var targets: Map[Long, ActorRef] = Map.empty
def receive = {
case Signal(requestId) =>
val s = sender
targets = targets + (requestId -> s)
createIssue(requestId).mapTo[Tuple2[Long, Long]].pipeTo(self) // <-- use pipeTo
case ids: Tuple2[Long, Long] =>
println(s"Sending $ids to dbActor")
dbActor ! ids
case r: ResponseSignal =>
println(s"Received from dbActor: $r")
val target = targets.get(r.requestId)
println(s"In actorA, sending to: $target")
target.foreach(_ ! r)
targets = targets - r.requestId
}
}
class DbActor extends Actor {
def receive = {
case (requestId: Long, issueId: Long) =>
val response = ResponseSignal(requestId, issueId)
println(s"In dbActor, sending $response to $sender")
sender ! response
}
}
val system = ActorSystem("jiratest")
implicit val ec = system.dispatcher
val dbActor = system.actorOf(Props[DbActor])
val jiraActor = system.actorOf(Props(new ActorA(dbActor)))
val requestId = 2L
def createIssue(requestId: Long): Future[(Long, Long)] = {
println(s"Creating an issue ID for requestId[$requestId]")
Future((requestId, 99L))
}
def createFeedback(): Future[Long] = {
implicit val timeout = Timeout(5.seconds)
val res = (jiraActor ? Signal(requestId)).mapTo[ResponseSignal]
res.map(_.issueId)
}
createFeedback().onComplete { x =>
println(s"Done: $x")
}
运行 ScalaFiddle 中的上述代码导致以下输出:
Creating an issue ID for requestId[2]
Sending (2,99) to dbActor
In dbActor, sending ResponseSignal(2,99) to Actor[akka://jiratest/user/$b#-710097339]
Received from dbActor: ResponseSignal(2,99)
In actorA, sending to: Some(Actor[akka://jiratest/temp/$a])
Done: Success(99)
我使用询问模式在名为 fun-F 的函数中向名为 actor-A 的参与者发送请求。 actor 将获得另一个系统以异步方式生成的 ID,完成后,我会将包含此 ID 的消息转发给另一个名为 actor-B 的 actor,actor-B 将执行一些数据库操作,然后发回DB 操作会向发送者发送一条消息,因为在我的例子中我使用转发模式,所以 actor-B 将发送者识别为 fun-F,akka 会给 fun-F 一个临时的 actor 名称,所以返回值应该交付给临时演员。
我的问题是:
如果我使用sync-method从另一个系统获取ID,然后将这个消息转发给actor-B,actor-B的DB操作后,结果可以传递给fun-F的值,并且fun-F 被 akka 框架运行时定义为临时演员 Actor[akka://ai-feedback-service/temp/$b]。
如果我使用异步方法从另一个系统获取ID,当它完成时,我将在另一个回调线程的oncompleted {}代码块中转发消息,处理actor-B中的DB操作成功,但是返回值无法传递到fun-F中定义的值,此时fun-F被akka framwork runtime定义为Actor[akka://ai-feedback-service/deadLetters]。所以 actor-B 迷路了,不知道如何返回或应该将此消息传递到哪里,这将导致我的日志中抛出 Ask time out 异常。
我该如何处理这个问题?或者我怎样才能避免这个死信请求超时异常?
下面是我的代码:
// this is the so-called fun-F [createFeedback]
def createFeedback(query: String,
response: String,
userId: Long,
userAgent: String,
requestId: Long,
errType: Short,
memo: String): Future[java.lang.Long] = {
val ticket = Ticket(userId,
requestId,
query,
response,
errType,
userAgent,
memo)
val issueId = (jiraActor ? CreateJiraTicketSignal(ticket))
.mapTo[CreateFeedbackResponseSignal].map{ r =>
r.issueId.asInstanceOf[java.lang.Long]
}
issueId
}
//this is the so-called actor-A [jiraActor]
//receive method are run in its parent actor for some authorization
//in this actor only override the handleActorMsg method to deal msg
override def handleActorMsg(msg: ActorMsgSignal): Unit = {
msg match {
case s:CreateJiraTicketSignal =>
val issueId = createIssue(cookieCache.cookieContext.flag,
cookieCache.cookieContext.cookie,
s.ticket)
println(s">> ${sender()} before map $issueId")
issueId.map{
case(id:Long) =>
println(s">> again++issueId = $id ${id.getClass}")
println(s">>> $self / ${sender()}")
println("again ++ jira action finished")
dbActor.forward(CreateFeedbackSignal(id,s.ticket))
case(message:String) if(!s.retry) =>
self ! CreateJiraTicketSignal(s.ticket,true)
case(message:String) if(s.retry) =>
log.error("cannot create ticket :" + message)
}
println(s">> after map $issueId")
}
//this is the so-called actor-B [dbActor]
override def receive: Receive = {
case CreateFeedbackSignal(issueId:Long, ticket:Ticket) =>
val timestampTicks = System.currentTimeMillis()
val description: String = Json.obj("question" -> ticket.query,
"answer" -> ticket.response)
.toString()
dao.createFeedback(issueId,
ticket.usrId.toString,
description,
FeedbackStatus.Open.getValue
.asInstanceOf[Byte],
new Timestamp(timestampTicks),
new Timestamp(timestampTicks),
ticket.usrAgent,
ticket.errType,
ticket.memo)
println(s">> sender = ${sender()}")
sender() ! (CreateFeedbackResponseSignal(issueId))
println("db issue id is " + issueId)
println("db action finished")
}
要避免死信问题,请执行以下操作:
对于每个请求,使用一个可以与请求的最终目标相关联的标识符(可能是
requestId
)。也就是说,将要传递给createFeedback
方法的requestId
绑定到该方法的调用者 (ActorRef
),然后通过消息链传递此 ID。您可以使用地图来保存这些关联。- 更改
CreateFeedbackResponseSignal(issueId)
以包含Ticket
class 中的requestId
:CreateFeedbackResponseSignal(requestId, issueId)
。
- 更改
当从 actor 内部处理
Future
的异步结果时,pipe
Future
的结果到self
而不是使用回调。- 使用这种方法,
createIssue
的结果将在结果可用时发送到jiraActor
。jiraActor
然后将该结果发送到dbActor
。 jiraActor
将是dbActor
中的sender
。当jiraActor
收到来自dbActor
的结果时,jiraActor
可以在其内部映射中查找对目标的引用。
- 使用这种方法,
下面是一个模拟您的用例并可在 ScalaFiddle 中运行的简单示例:
import akka.actor._
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import language.postfixOps
import scala.concurrent._
import scala.concurrent.duration._
case class Signal(requestId: Long)
case class ResponseSignal(requestId: Long, issueId: Long)
object ActorA {
def props(actorB: ActorRef) = Props(new ActorA(actorB))
}
class ActorA(dbActor: ActorRef) extends Actor {
import context.dispatcher
var targets: Map[Long, ActorRef] = Map.empty
def receive = {
case Signal(requestId) =>
val s = sender
targets = targets + (requestId -> s)
createIssue(requestId).mapTo[Tuple2[Long, Long]].pipeTo(self) // <-- use pipeTo
case ids: Tuple2[Long, Long] =>
println(s"Sending $ids to dbActor")
dbActor ! ids
case r: ResponseSignal =>
println(s"Received from dbActor: $r")
val target = targets.get(r.requestId)
println(s"In actorA, sending to: $target")
target.foreach(_ ! r)
targets = targets - r.requestId
}
}
class DbActor extends Actor {
def receive = {
case (requestId: Long, issueId: Long) =>
val response = ResponseSignal(requestId, issueId)
println(s"In dbActor, sending $response to $sender")
sender ! response
}
}
val system = ActorSystem("jiratest")
implicit val ec = system.dispatcher
val dbActor = system.actorOf(Props[DbActor])
val jiraActor = system.actorOf(Props(new ActorA(dbActor)))
val requestId = 2L
def createIssue(requestId: Long): Future[(Long, Long)] = {
println(s"Creating an issue ID for requestId[$requestId]")
Future((requestId, 99L))
}
def createFeedback(): Future[Long] = {
implicit val timeout = Timeout(5.seconds)
val res = (jiraActor ? Signal(requestId)).mapTo[ResponseSignal]
res.map(_.issueId)
}
createFeedback().onComplete { x =>
println(s"Done: $x")
}
运行 ScalaFiddle 中的上述代码导致以下输出:
Creating an issue ID for requestId[2]
Sending (2,99) to dbActor
In dbActor, sending ResponseSignal(2,99) to Actor[akka://jiratest/user/$b#-710097339]
Received from dbActor: ResponseSignal(2,99)
In actorA, sending to: Some(Actor[akka://jiratest/temp/$a])
Done: Success(99)