如何在 Actor 的同一线程内获取 http.singleRequest(httpRequest) 的 Httpresponse?
How to get the Httpresponse of http.singleRequest(httpRequest) inside the same thread on an Actor?
我有一个 Actor,它使用 httpRequest => http.singleRequest(httpRequest).pipeTo(self)
在一条 case BidRequest
消息中发送 HTTP POST 请求。参与者在另一个 case HttpResponse
消息中接收到 httpResponse。在第二条 case HttpResponse
消息中,我想更改第一个 case BidRequest
消息将发回的变量。因为消息是异步处理的,所以当我在第二条消息上编辑变量时,第一条消息已经发回了旧状态的变量。
我想我需要以某种方式使用 akka.pattern.ask
来不让消息到达另一个 case HttpResponse
,但保持原样 case BidRequest
以便我可以编辑变量到位。
object AuctionClientActor {
def props(bidders: List[String]) = { Props(new AuctionClientActor(bidders)) }
}
class AuctionClientActor(bidders: List[String])
extends Actor with ActorLogging
with BidJsonProtocol with SprayJsonSupport {
import context.dispatcher
implicit val system = context.system
val http = Http(system)
var bidOffer: BidOffer = BidOffer("", 0, "")
def receive = {
case bidRequest@BidRequest(requestId, bid) =>
val content = bidRequest.bid.toJson.toString
val latch = new CountDownLatch(bidders.size)
val listResponseFuture: List[Future[HttpResponse]] = bidders
.map(bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder), // uri = Uri("http://localhost:8081"),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
)
// IF I USE pipeTo HERE THE HttpResponse WILL GO TO ANOTHER CASE
.map(httpRequest => http.singleRequest(httpRequest).pipeTo(self)) // send the request
listResponseFuture.foreach { response =>
Await.result(response, 3 seconds)
response.onComplete {
case Success(value) => latch.countDown // println(s"response success: $value")
case Failure(exception) =>
println(s"response failure: $exception")
latch.countDown
}
}
latch.await(3, TimeUnit.SECONDS)
println("sending response now... BUT bidOffer WAS EDITED IN ANOTHER case thread")
sender() ! Some(bidOffer.content)
bidOffer = BidOffer("", 0, "")
case resp@HttpResponse(StatusCodes.OK, headers, entity, _) =>
log.info(s"received HttpResponse OK(200): $resp")
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
println("Got response, body: " + body.utf8String)
val newBidOffer = BidOfferConverter.getBidOffer(body.utf8String)
// I SHOULD NOT EDIT bidOffer HERE. INSTEAD I NEED TO EDIT bidOffer ON THE case BidRequest
if (bidOffer.bid == 0) {
println("new")
bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
} else if (newBidOffer.bid > bidOffer.bid) {
println("replace new")
bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
} else {
println("none")
}
}
case resp@HttpResponse(code, _, _, _) =>
log.info(s"Request failed, response code: $code")
resp.discardEntityBytes()
}
}
我正在查看此 answer 以将 List[Future]
转换为 Future[List]
,但是当我这样做时,我创建了 Future[List[Any]]
而不是 HttpResponse
了。
下一个代码片段: 所以我试着按照你说的方式做,但我正在创建一个 List[Future[Future[String]]]
。如果我只有一个主机来执行请求,那很容易。但是因为我可以有 1、2 或 3 个请求,所以我创建了一个列表并且代码变得复杂。再加上来自 akka-stream
的 runFold
创建另一个 Future
。你能提示一下如何按照你所说的方式实现它吗?
val responseListFuture: List[Future[Future[String]]] = bidders.map { bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder), // uri = Uri("http://localhost:8081 | 8082 | 8083"),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
}
.map { httpRequest =>
http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future
.map { httpResponse =>
println(s"response: $httpResponse")
// this creates the second Future
httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
println("Got response, body: " + body.utf8String)
// BidOfferConverter.getBidOffer(body.utf8String)
body.utf8String
}
}
}
简短的回答是你不能,除非在 receive
中阻塞,这是一个主要的禁忌。
这有点X:Y问题的感觉。这里的实际目标是什么?只是您不想在所有请求完成之前发送响应吗?
如果这就是您想要的,那么采取的方法是 map
未来将其转换为消息,其中包含构建响应所需的信息。这样做,您甚至可能不需要 bidOffer
变量。
Future.sequence
会将 Seq[Future[A]]
(以及其他集合类型)折叠成 Future[Seq[A]]
(如果任何期货失败则失败:这可能不是您要找的,在这种情况下,Future
伴随对象中的其他组合器可能更符合您的要求)。
我必须把它放在运行。我还使用 Try
、Option
、getOrElse
以防某些服务器出现故障。所以我还是发了一个HttpResponse
回来。为了完整起见,我将在这里留下答案。如果有人有更好的方法,我很乐意重新考虑。
class AuctionClientActor(bidders: List[String])
extends Actor with ActorLogging
with BidJsonProtocol with SprayJsonSupport {
import context.dispatcher
implicit val system = context.system
val http = Http(system)
def receive = {
case bidRequest@BidRequest(requestId, bid) =>
log.info(s"received bid request: $bidRequest")
val content = bidRequest.bid.toJson.toString
.replace("[[", "{")
.replace("]]", "}")
.replace("\",\"", "\": \"")
.replace("[", "")
.replace("]", "")
val responseListFuture = bidders.map { bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
}
.map { httpRequest =>
val httpResponseFuture = http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future[HttpResponse]
Await.ready(httpResponseFuture, 5 seconds)
httpResponseFuture.value.get.getOrElse(HttpResponse(StatusCodes.NotFound))
}.filter(httpResponse => httpResponse.status == StatusCodes.OK)
.map { httpResponse =>
println(s"response: $httpResponse")
val bidOfferFuture = httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
println("Got response, body: " + body.utf8String)
BidOfferConverter.getBidOffer(body.utf8String)
}
Await.ready(bidOfferFuture, 5 seconds)
bidOfferFuture.value.get.getOrElse(BidOffer("", 0, ""))
}
responseListFuture.foreach { bidOffer =>
println(s"bidOffer: ${bidOffer.id}, ${bidOffer.bid}, ${bidOffer.content}")
}
val bidOfferWinner = responseListFuture.maxBy(_.bid)
println(s"winner: $bidOfferWinner")
sender() ! Some(bidOfferWinner.content)
}
}
我有一个 Actor,它使用 httpRequest => http.singleRequest(httpRequest).pipeTo(self)
在一条 case BidRequest
消息中发送 HTTP POST 请求。参与者在另一个 case HttpResponse
消息中接收到 httpResponse。在第二条 case HttpResponse
消息中,我想更改第一个 case BidRequest
消息将发回的变量。因为消息是异步处理的,所以当我在第二条消息上编辑变量时,第一条消息已经发回了旧状态的变量。
我想我需要以某种方式使用 akka.pattern.ask
来不让消息到达另一个 case HttpResponse
,但保持原样 case BidRequest
以便我可以编辑变量到位。
object AuctionClientActor {
def props(bidders: List[String]) = { Props(new AuctionClientActor(bidders)) }
}
class AuctionClientActor(bidders: List[String])
extends Actor with ActorLogging
with BidJsonProtocol with SprayJsonSupport {
import context.dispatcher
implicit val system = context.system
val http = Http(system)
var bidOffer: BidOffer = BidOffer("", 0, "")
def receive = {
case bidRequest@BidRequest(requestId, bid) =>
val content = bidRequest.bid.toJson.toString
val latch = new CountDownLatch(bidders.size)
val listResponseFuture: List[Future[HttpResponse]] = bidders
.map(bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder), // uri = Uri("http://localhost:8081"),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
)
// IF I USE pipeTo HERE THE HttpResponse WILL GO TO ANOTHER CASE
.map(httpRequest => http.singleRequest(httpRequest).pipeTo(self)) // send the request
listResponseFuture.foreach { response =>
Await.result(response, 3 seconds)
response.onComplete {
case Success(value) => latch.countDown // println(s"response success: $value")
case Failure(exception) =>
println(s"response failure: $exception")
latch.countDown
}
}
latch.await(3, TimeUnit.SECONDS)
println("sending response now... BUT bidOffer WAS EDITED IN ANOTHER case thread")
sender() ! Some(bidOffer.content)
bidOffer = BidOffer("", 0, "")
case resp@HttpResponse(StatusCodes.OK, headers, entity, _) =>
log.info(s"received HttpResponse OK(200): $resp")
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
println("Got response, body: " + body.utf8String)
val newBidOffer = BidOfferConverter.getBidOffer(body.utf8String)
// I SHOULD NOT EDIT bidOffer HERE. INSTEAD I NEED TO EDIT bidOffer ON THE case BidRequest
if (bidOffer.bid == 0) {
println("new")
bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
} else if (newBidOffer.bid > bidOffer.bid) {
println("replace new")
bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
} else {
println("none")
}
}
case resp@HttpResponse(code, _, _, _) =>
log.info(s"Request failed, response code: $code")
resp.discardEntityBytes()
}
}
我正在查看此 answer 以将 List[Future]
转换为 Future[List]
,但是当我这样做时,我创建了 Future[List[Any]]
而不是 HttpResponse
了。
下一个代码片段: 所以我试着按照你说的方式做,但我正在创建一个 List[Future[Future[String]]]
。如果我只有一个主机来执行请求,那很容易。但是因为我可以有 1、2 或 3 个请求,所以我创建了一个列表并且代码变得复杂。再加上来自 akka-stream
的 runFold
创建另一个 Future
。你能提示一下如何按照你所说的方式实现它吗?
val responseListFuture: List[Future[Future[String]]] = bidders.map { bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder), // uri = Uri("http://localhost:8081 | 8082 | 8083"),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
}
.map { httpRequest =>
http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future
.map { httpResponse =>
println(s"response: $httpResponse")
// this creates the second Future
httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
println("Got response, body: " + body.utf8String)
// BidOfferConverter.getBidOffer(body.utf8String)
body.utf8String
}
}
}
简短的回答是你不能,除非在 receive
中阻塞,这是一个主要的禁忌。
这有点X:Y问题的感觉。这里的实际目标是什么?只是您不想在所有请求完成之前发送响应吗?
如果这就是您想要的,那么采取的方法是 map
未来将其转换为消息,其中包含构建响应所需的信息。这样做,您甚至可能不需要 bidOffer
变量。
Future.sequence
会将 Seq[Future[A]]
(以及其他集合类型)折叠成 Future[Seq[A]]
(如果任何期货失败则失败:这可能不是您要找的,在这种情况下,Future
伴随对象中的其他组合器可能更符合您的要求)。
我必须把它放在运行。我还使用 Try
、Option
、getOrElse
以防某些服务器出现故障。所以我还是发了一个HttpResponse
回来。为了完整起见,我将在这里留下答案。如果有人有更好的方法,我很乐意重新考虑。
class AuctionClientActor(bidders: List[String])
extends Actor with ActorLogging
with BidJsonProtocol with SprayJsonSupport {
import context.dispatcher
implicit val system = context.system
val http = Http(system)
def receive = {
case bidRequest@BidRequest(requestId, bid) =>
log.info(s"received bid request: $bidRequest")
val content = bidRequest.bid.toJson.toString
.replace("[[", "{")
.replace("]]", "}")
.replace("\",\"", "\": \"")
.replace("[", "")
.replace("]", "")
val responseListFuture = bidders.map { bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
}
.map { httpRequest =>
val httpResponseFuture = http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future[HttpResponse]
Await.ready(httpResponseFuture, 5 seconds)
httpResponseFuture.value.get.getOrElse(HttpResponse(StatusCodes.NotFound))
}.filter(httpResponse => httpResponse.status == StatusCodes.OK)
.map { httpResponse =>
println(s"response: $httpResponse")
val bidOfferFuture = httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
println("Got response, body: " + body.utf8String)
BidOfferConverter.getBidOffer(body.utf8String)
}
Await.ready(bidOfferFuture, 5 seconds)
bidOfferFuture.value.get.getOrElse(BidOffer("", 0, ""))
}
responseListFuture.foreach { bidOffer =>
println(s"bidOffer: ${bidOffer.id}, ${bidOffer.bid}, ${bidOffer.content}")
}
val bidOfferWinner = responseListFuture.maxBy(_.bid)
println(s"winner: $bidOfferWinner")
sender() ! Some(bidOfferWinner.content)
}
}