Akka - 发件人[null] 发送了类型的消息
Akka - Sender[null] sent message of type
我有一条喷射路线,我在其中声明了我的一位演员。
val myActor = actorRefFactory.actorSelection("/user/my-actor")
我的路线如下:
get {
path(Segment / Segment) { (poolId, trackId) =>
respondWithMediaType(MediaTypes.`application/json`) {
val request = Request(poolId, trackId)
val f = (myActor ? request)
.recoverWith {
case a: AskTimeoutException =>
Future.failed[StandardRoute](throw new Exception(s"We got a timeout", a))
case e: Exception => Future.failed[StandardRoute](throw new Exception(s"We got an error", e))
}
onComplete(f) {
case Success(resp) => complete(OK, resp)
case Failure(e) =>
log.error(s"Fatal request error: $trackId / $poolId", e)
complete(InternalServerError, ErrorCodes.ErrorNotHandled)
}
}
}
}
有时,我可以看到当我同时收到很多请求时,其中一些可能会失败并显示以下消息:
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[ActorSelection[Anchor(akka://default/), Path(/user/my-actor)]] after
[8000 ms]. Sender[null] sent message of type
"my.company.messages.Request".
问题是如果我接受相同的请求并尝试再次发送它,它会起作用,只是有时会发生这种情况,我不知道如何解决。
演员确实在做很多事情,里面有很多未来,直到它 returns 成为喷雾路线的价值。
在 actor 内部,我创建了一个名为 replyTo 的 val 以保留发件人的值。
关于为什么有时会出现此错误的任何想法?
编辑
只是一个关于我如何管理 myActor 的例子:
class MyActor extends Actor with ActorLogging {
private implicit val timeout = Timeout(8.seconds)
def receive = {
case req: Request =>
val replyTo = sender()
doOneThing.map { one =>
doSecondThing(one).map { sec =>
replyTo ! sec
}
}
}
}
doOneThing 和 doSecondThing 是 Futures...我有很多 Futures 分布在这个 actor 周围以适应不同的情况。
您看到的发件人[null] 是正常行为。 ask 方法 ?
采用默认值 ActorRef.noSender
的隐式参数 sender
。通常,如果您在 Actor
中,您在范围内有一个隐式的 ActorRef
,称为 self
,但由于您不在 Actor
中,它只是采用默认值。
您的错误原因可能是收到您消息的 Actor
没有及时回复。
我认为您没有在此处使用路由,这就是您收到超时异常的原因。
例如,
假设您的一个请求需要 1 秒才能执行。
并假设您一次收到 4 个请求,当请求到达一个 actor 时,它会附加到一个队列中,并且对于每个请求,您都在等待 8 秒。所以在队列中假设第 4 个请求将在 4 秒内执行,因此您将在 8 秒内得到响应。但是当一次有 100 个请求并且您的一个演员可以处理它们时,对于第 100 个请求,执行将需要 100 秒,但是您只等待 8.second 第 100 个请求,这就是您得到的原因超时异常。
所以解决方案是,你可以在这里使用路由 -
system.actorOf(RoundRobinPool(并发).props(Props(new MyActor())))
并且您可以在系统可用进程上设置并发。假设设置并发值 100 那么现在对于第 100 个请求,执行只需要 2 秒。所以我认为路由可以是一个解决方案。
如果您不知道那里会有多少请求,它可能是 1k 或更多,那么您可以动态创建演员,您可以根据请求动态创建演员。因此,无论何时收到请求,您的一个独立演员都会在那里为其提供服务。
当我没有正确地将响应发回给发件人时,我会遇到这种错误。
不正确(只是返回值):
def receive = {
case DataFetch =>
data
}
正确(将值发送到 sender
):
def receive = {
case DataFetch =>
sender ! data
}
我有一条喷射路线,我在其中声明了我的一位演员。
val myActor = actorRefFactory.actorSelection("/user/my-actor")
我的路线如下:
get {
path(Segment / Segment) { (poolId, trackId) =>
respondWithMediaType(MediaTypes.`application/json`) {
val request = Request(poolId, trackId)
val f = (myActor ? request)
.recoverWith {
case a: AskTimeoutException =>
Future.failed[StandardRoute](throw new Exception(s"We got a timeout", a))
case e: Exception => Future.failed[StandardRoute](throw new Exception(s"We got an error", e))
}
onComplete(f) {
case Success(resp) => complete(OK, resp)
case Failure(e) =>
log.error(s"Fatal request error: $trackId / $poolId", e)
complete(InternalServerError, ErrorCodes.ErrorNotHandled)
}
}
}
}
有时,我可以看到当我同时收到很多请求时,其中一些可能会失败并显示以下消息:
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://default/), Path(/user/my-actor)]] after [8000 ms]. Sender[null] sent message of type "my.company.messages.Request".
问题是如果我接受相同的请求并尝试再次发送它,它会起作用,只是有时会发生这种情况,我不知道如何解决。
演员确实在做很多事情,里面有很多未来,直到它 returns 成为喷雾路线的价值。
在 actor 内部,我创建了一个名为 replyTo 的 val 以保留发件人的值。
关于为什么有时会出现此错误的任何想法?
编辑
只是一个关于我如何管理 myActor 的例子:
class MyActor extends Actor with ActorLogging {
private implicit val timeout = Timeout(8.seconds)
def receive = {
case req: Request =>
val replyTo = sender()
doOneThing.map { one =>
doSecondThing(one).map { sec =>
replyTo ! sec
}
}
}
}
doOneThing 和 doSecondThing 是 Futures...我有很多 Futures 分布在这个 actor 周围以适应不同的情况。
您看到的发件人[null] 是正常行为。 ask 方法 ?
采用默认值 ActorRef.noSender
的隐式参数 sender
。通常,如果您在 Actor
中,您在范围内有一个隐式的 ActorRef
,称为 self
,但由于您不在 Actor
中,它只是采用默认值。
您的错误原因可能是收到您消息的 Actor
没有及时回复。
我认为您没有在此处使用路由,这就是您收到超时异常的原因。 例如, 假设您的一个请求需要 1 秒才能执行。 并假设您一次收到 4 个请求,当请求到达一个 actor 时,它会附加到一个队列中,并且对于每个请求,您都在等待 8 秒。所以在队列中假设第 4 个请求将在 4 秒内执行,因此您将在 8 秒内得到响应。但是当一次有 100 个请求并且您的一个演员可以处理它们时,对于第 100 个请求,执行将需要 100 秒,但是您只等待 8.second 第 100 个请求,这就是您得到的原因超时异常。
所以解决方案是,你可以在这里使用路由 -
system.actorOf(RoundRobinPool(并发).props(Props(new MyActor())))
并且您可以在系统可用进程上设置并发。假设设置并发值 100 那么现在对于第 100 个请求,执行只需要 2 秒。所以我认为路由可以是一个解决方案。
如果您不知道那里会有多少请求,它可能是 1k 或更多,那么您可以动态创建演员,您可以根据请求动态创建演员。因此,无论何时收到请求,您的一个独立演员都会在那里为其提供服务。
当我没有正确地将响应发回给发件人时,我会遇到这种错误。
不正确(只是返回值):
def receive = {
case DataFetch =>
data
}
正确(将值发送到 sender
):
def receive = {
case DataFetch =>
sender ! data
}