Scala Akka Actors:如何将 Http 响应的结果发送回发件人?
Scala Akka Actors: How to send the result of an Http response back to the sender?
我正在尝试在 Akka Actor 中执行以下 Scala 代码。
class FilteringService(implicit timeout: Timeout) extends Actor {
def receive: PartialFunction[Any, Unit] = {
case GetProfiles ⇒
val requester = sender
def getProfiles = {
var result = new Array[Profile](0)
println("[GET-PROFILES] Entered, making request")
val req = Get("http://localhost:9090/profiles")
implicit val profileFormat = jsonFormat16(Profile)
val responseFuture: Future[HttpResponse] = Http().singleRequest(req)
println("[GET-PROFILES] Entered, request sent")
responseFuture.onComplete {
case Success(response) =>
println("[RES - SUCCESS] Request returned with " + response.status)
val responseAsProfiles = Unmarshal(response.entity).to[Array[Profile]]
responseAsProfiles.onComplete {
println("[UNMARSH - SUCCESS] Unmarshaling Done!")
_.get match {
case profiles: Array[Profile] =>
println("[UNMARSH - SUCCESS] Sending Profiles message to " + sender())
requester ! profiles
println("[UNMARSH - SUCCESS] Message sent to " + sender())
case _ => println("error")
}
}
case Failure(_) =>
sys.error("something wrong")
//return Future[Array[Profile]]
}
}
println("[RECEIVE] Message GetProfiles received from " + sender().toString())
getProfiles
println("[RECEIVE] Message GetProfiles invoked")
}
当 Actor 收到消息“GetProfiles”时:
1- 它向远程服务器发送请求,因此操作的结果是 Future[HttpResponse]
2- 如果成功,它会检索响应(一个 JSON 数组)并要求将对象解组到 Array[Profile]。 (Profile 模型并不重要)。 Unmarshall 方法的结果是 Future[Array[Profile]]
3-万一成功,我要将结果回传给原发件人!
我设法做到了这一点,但这是一个技巧,因为我将发件人保存在一个变量中,该变量在范围内可见 (requester)。
我知道有管道模式,所以理论上我可以将 responseAsProfiles 对象发送回发件人,但该对象是在 responseFuture 的 onComplete 方法中创建的 对象(我们必须等待,当然!)
就是这样!
在这种情况下,如何使用管道模式将结果发送回发件人?
提前致谢!!!
一般的想法是,您使用 map
和 flatMap
组合期货,并尽量避免使用 onComplete
。
看看您是否可以将您的代码转换为以下更小的部分,然后进行组合:
def getRawProfileData(): Future[HttpResponse] = {
// ... here you make http request
}
def unmarshalProfiles(response: HttpResponse): Future[List[Profile]] = {
// ... unmarshalling logic
}
def getProfiles(): Future[List[Profile]] = getRawProfileData().flatMape(unmarshalProfiles)
// now from receive block
case GetProfiles ⇒ getProfiles().pipeTo(sender())
我正在尝试在 Akka Actor 中执行以下 Scala 代码。
class FilteringService(implicit timeout: Timeout) extends Actor {
def receive: PartialFunction[Any, Unit] = {
case GetProfiles ⇒
val requester = sender
def getProfiles = {
var result = new Array[Profile](0)
println("[GET-PROFILES] Entered, making request")
val req = Get("http://localhost:9090/profiles")
implicit val profileFormat = jsonFormat16(Profile)
val responseFuture: Future[HttpResponse] = Http().singleRequest(req)
println("[GET-PROFILES] Entered, request sent")
responseFuture.onComplete {
case Success(response) =>
println("[RES - SUCCESS] Request returned with " + response.status)
val responseAsProfiles = Unmarshal(response.entity).to[Array[Profile]]
responseAsProfiles.onComplete {
println("[UNMARSH - SUCCESS] Unmarshaling Done!")
_.get match {
case profiles: Array[Profile] =>
println("[UNMARSH - SUCCESS] Sending Profiles message to " + sender())
requester ! profiles
println("[UNMARSH - SUCCESS] Message sent to " + sender())
case _ => println("error")
}
}
case Failure(_) =>
sys.error("something wrong")
//return Future[Array[Profile]]
}
}
println("[RECEIVE] Message GetProfiles received from " + sender().toString())
getProfiles
println("[RECEIVE] Message GetProfiles invoked")
}
当 Actor 收到消息“GetProfiles”时:
1- 它向远程服务器发送请求,因此操作的结果是 Future[HttpResponse]
2- 如果成功,它会检索响应(一个 JSON 数组)并要求将对象解组到 Array[Profile]。 (Profile 模型并不重要)。 Unmarshall 方法的结果是 Future[Array[Profile]]
3-万一成功,我要将结果回传给原发件人!
我设法做到了这一点,但这是一个技巧,因为我将发件人保存在一个变量中,该变量在范围内可见 (requester)。 我知道有管道模式,所以理论上我可以将 responseAsProfiles 对象发送回发件人,但该对象是在 responseFuture 的 onComplete 方法中创建的 对象(我们必须等待,当然!)
就是这样! 在这种情况下,如何使用管道模式将结果发送回发件人? 提前致谢!!!
一般的想法是,您使用 map
和 flatMap
组合期货,并尽量避免使用 onComplete
。
看看您是否可以将您的代码转换为以下更小的部分,然后进行组合:
def getRawProfileData(): Future[HttpResponse] = {
// ... here you make http request
}
def unmarshalProfiles(response: HttpResponse): Future[List[Profile]] = {
// ... unmarshalling logic
}
def getProfiles(): Future[List[Profile]] = getRawProfileData().flatMape(unmarshalProfiles)
// now from receive block
case GetProfiles ⇒ getProfiles().pipeTo(sender())