Akka - 发件人[null] 发送了类型的消息

Akka - Sender[null] sent message of type

我有一条喷射路线,我在其中声明了我的一位演员。

val myActor = actorRefFactory.actorSelection("/user/my-actor")

我的路线如下:

get {
  path(Segment / Segment) { (poolId, trackId) =>
    respondWithMediaType(MediaTypes.`application/json`) {
      val request = Request(poolId, trackId)

      val f = (myActor ? request)
        .recoverWith {
          case a: AskTimeoutException =>
            Future.failed[StandardRoute](throw new Exception(s"We got a timeout", a))

          case e: Exception => Future.failed[StandardRoute](throw new Exception(s"We got an error", e))
        }

      onComplete(f) {
        case Success(resp) => complete(OK, resp)

        case Failure(e) =>
          log.error(s"Fatal request error: $trackId / $poolId", e)
          complete(InternalServerError, ErrorCodes.ErrorNotHandled)
      }
    }
  }
}

有时,我可以看到当我同时收到很多请求时,其中一些可能会失败并显示以下消息:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://default/), Path(/user/my-actor)]] after [8000 ms]. Sender[null] sent message of type "my.company.messages.Request".

问题是如果我接受相同的请求并尝试再次发送它,它会起作用,只是有时会发生这种情况,我不知道如何解决。

演员确实在做很多事情,里面有很多未来,直到它 returns 成为喷雾路线的价值。

在 actor 内部,我创建了一个名为 replyTo 的 val 以保留发件人的值。

关于为什么有时会出现此错误的任何想法?

编辑

只是一个关于我如何管理 myActor 的例子:

class MyActor extends Actor with ActorLogging {

  private implicit val timeout = Timeout(8.seconds)

  def receive = {

    case req: Request =>

      val replyTo = sender()

      doOneThing.map { one =>

        doSecondThing(one).map { sec =>
          replyTo ! sec
        }
      }
  }
}

doOneThing 和 doSecondThing 是 Futures...我有很多 Futures 分布在这个 actor 周围以适应不同的情况。

您看到的发件人[null] 是正常行为。 ask 方法 ? 采用默认值 ActorRef.noSender 的隐式参数 sender。通常,如果您在 Actor 中,您在范围内有一个隐式的 ActorRef,称为 self,但由于您不在 Actor 中,它只是采用默认值。

您的错误原因可能是收到您消息的 Actor 没有及时回复。

我认为您没有在此处使用路由,这就是您收到超时异常的原因。 例如, 假设您的一个请求需要 1 秒才能执行。 并假设您一次收到 4 个请求,当请求到达一个 actor 时,它会附加到一个队列中,并且对于每个请求,您都在等待 8 秒。所以在队列中假设第 4 个请求将在 4 秒内执行,因此您将在 8 秒内得到响应。但是当一次有 100 个请求并且您的一个演员可以处理它们时,对于第 100 个请求,执行将需要 100 秒,但是您只等待 8.second 第 100 个请求,这就是您得到的原因超时异常。

所以解决方案是,你可以在这里使用路由 -

system.actorOf(RoundRobinPool(并发).props(Props(new MyActor())))

并且您可以在系统可用进程上设置并发。假设设置并发值 100 那么现在对于第 100 个请求,执行只需要 2 秒。所以我认为路由可以是一个解决方案。

如果您不知道那里会有多少请求,它可能是 1k 或更多,那么您可以动态创建演员,您可以根据请求动态创建演员。因此,无论何时收到请求,您的一个独立演员都会在那里为其提供服务。

当我没有正确地将响应发回给发件人时,我会遇到这种错误。

不正确(只是返回值):

def receive = {
  case DataFetch =>
    data
}

正确(将值发送到 sender):

  def receive = {
    case DataFetch =>
      sender ! data
  }