Scala Akka Actors:如何将 Http 响应的结果发送回发件人?

Scala Akka Actors: How to send the result of an Http response back to the sender?

我正在尝试在 Akka Actor 中执行以下 Scala 代码。

class FilteringService(implicit timeout: Timeout) extends Actor {

  def receive: PartialFunction[Any, Unit] = {

    case GetProfiles ⇒
      val requester = sender
      def getProfiles = {

        var result = new Array[Profile](0)

        println("[GET-PROFILES] Entered, making request")
        val req = Get("http://localhost:9090/profiles")
        implicit val profileFormat = jsonFormat16(Profile)


        val responseFuture: Future[HttpResponse] = Http().singleRequest(req)
        println("[GET-PROFILES] Entered, request sent")
        responseFuture.onComplete {

          case Success(response) =>
            println("[RES - SUCCESS] Request returned with " + response.status)
            val responseAsProfiles =  Unmarshal(response.entity).to[Array[Profile]]
            responseAsProfiles.onComplete {
            println("[UNMARSH - SUCCESS] Unmarshaling Done!")
            _.get match {
              case profiles: Array[Profile] =>
                println("[UNMARSH - SUCCESS] Sending Profiles message to " + sender())
                requester ! profiles
                println("[UNMARSH - SUCCESS] Message sent to " + sender())
              case _ => println("error")
              }
            }

          case Failure(_)   =>
            sys.error("something wrong")
            //return Future[Array[Profile]]

        }
      }
      println("[RECEIVE] Message GetProfiles received from " + sender().toString())
      getProfiles
      println("[RECEIVE] Message GetProfiles invoked")

  }

当 Actor 收到消息“GetProfiles”时:

1- 它向远程服务器发送请求,因此操作的结果是 Future[HttpResponse]

2- 如果成功,它会检索响应(一个 JSON 数组)并要求将对象解组到 Array[Profile]。 (Profile 模型并不重要)。 Unmarshall 方法的结果是 Future[Array[Profile]]

3-万一成功,我要将结果回传给原发件人!

我设法做到了这一点,但这是一个技巧,因为我将发件人保存在一个变量中,该变量在范围内可见 (requester)。 我知道有管道模式,所以理论上我可以将 responseAsProfiles 对象发送回发件人,但该对象是在 responseFuture 的 onComplete 方法中创建的 对象(我们必须等待,当然!)

就是这样! 在这种情况下,如何使用管道模式将结果发送回发件人? 提前致谢!!!

一般的想法是,您使用 mapflatMap 组合期货,并尽量避免使用 onComplete

看看您是否可以将您的代码转换为以下更小的部分,然后进行组合:

def getRawProfileData(): Future[HttpResponse] = {
 // ... here you make http request
}

def unmarshalProfiles(response: HttpResponse): Future[List[Profile]] = {
  // ... unmarshalling logic
}

def getProfiles(): Future[List[Profile]] = getRawProfileData().flatMape(unmarshalProfiles)

// now from receive block

case GetProfiles ⇒ getProfiles().pipeTo(sender())