如何在 like actor 中使用 like-http cachedHostConnectionPool?
How to use the akka-http cachedHostConnectionPool inside an akka actor?
akka http 文档在 high level client request API documentation 中提到我们不应该在 actor 的 Future 中使用 access actor state form。
相反,这应该是使用的模式:
import akka.actor.Actor
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl.ImplicitMaterializer
class Myself extends Actor
with ImplicitMaterializer
with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
val http = Http(context.system)
override def preStart() = {
http.singleRequest(HttpRequest(uri = "http://akka.io"))
.pipeTo(self)
}
def receive = {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
log.info("Got response, body: " + entity.dataBytes.runFold(ByteString(""))(_ ++ _))
case HttpResponse(code, _, _, _) =>
log.info("Request failed, response code: " + code)
}
}
我们是否应该在使用 cachedHostConnectionPool
时做类似的事情?
例如:
import akka.actor.Actor
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl.ImplicitMaterializer
class Myself extends Actor
with ImplicitMaterializer
with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
var state = 10
val http = Http(context.system)
val pool = http.cachedHostConnectionPoolTls[Int](apiEndpoint.authority.host.toString())
override def preStart() = {
Source.single(HttpRequest(uri = "http://akka.io") -> 42)
.via(poolClientFlow)
.runWith(Sink.head)
.pipeTo(self)
}
def receive = {
case (res, ref) => ref match {
case 42 => state -= 1 // Do something with the response
}
}
}
如果是这样,我们为什么需要它?在文档中找不到解释
如果不是,正确的模式是什么?
谢谢
如我的评论所述,如果您需要改变 actor 的内部状态,您应该使用 pipeTo
将 Future
的结果发送回您自己进行处理。如果您不这样做,您将 运行 冒着 运行 陷入内部状态并发修改问题的风险,并失去使用 actor 的好处。 Future
的 post 完成逻辑不会在参与者邮箱处理的上下文中执行,因此可能与邮箱中的消息同时执行,从而导致潜在的并发问题。这就是为什么如果您需要在 Future
完成后改变状态,建议通过管道返回给自己。
现在,如果之后没有状态管理器,那么您不必使用 pipeTo
,因为并发修改状态不会成为问题。
akka http 文档在 high level client request API documentation 中提到我们不应该在 actor 的 Future 中使用 access actor state form。
相反,这应该是使用的模式:
import akka.actor.Actor
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl.ImplicitMaterializer
class Myself extends Actor
with ImplicitMaterializer
with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
val http = Http(context.system)
override def preStart() = {
http.singleRequest(HttpRequest(uri = "http://akka.io"))
.pipeTo(self)
}
def receive = {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
log.info("Got response, body: " + entity.dataBytes.runFold(ByteString(""))(_ ++ _))
case HttpResponse(code, _, _, _) =>
log.info("Request failed, response code: " + code)
}
}
我们是否应该在使用 cachedHostConnectionPool
时做类似的事情?
例如:
import akka.actor.Actor
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl.ImplicitMaterializer
class Myself extends Actor
with ImplicitMaterializer
with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
var state = 10
val http = Http(context.system)
val pool = http.cachedHostConnectionPoolTls[Int](apiEndpoint.authority.host.toString())
override def preStart() = {
Source.single(HttpRequest(uri = "http://akka.io") -> 42)
.via(poolClientFlow)
.runWith(Sink.head)
.pipeTo(self)
}
def receive = {
case (res, ref) => ref match {
case 42 => state -= 1 // Do something with the response
}
}
}
如果是这样,我们为什么需要它?在文档中找不到解释 如果不是,正确的模式是什么?
谢谢
如我的评论所述,如果您需要改变 actor 的内部状态,您应该使用 pipeTo
将 Future
的结果发送回您自己进行处理。如果您不这样做,您将 运行 冒着 运行 陷入内部状态并发修改问题的风险,并失去使用 actor 的好处。 Future
的 post 完成逻辑不会在参与者邮箱处理的上下文中执行,因此可能与邮箱中的消息同时执行,从而导致潜在的并发问题。这就是为什么如果您需要在 Future
完成后改变状态,建议通过管道返回给自己。
现在,如果之后没有状态管理器,那么您不必使用 pipeTo
,因为并发修改状态不会成为问题。