如何使用管道模式处理未类型化 Actor 中的 Future HttpResponse
How to handle a Future HttpResponse within an untyped Actor using pipe pattern
在我的项目中,我必须编写一个休息客户端,它将接收来自休息服务的 HttpResponse 作为未来。我想要的是记录响应的状态代码,如果有任何异常,也记录该异常。我怎样才能使用管道模式实现这一点。 PFB 我的代码片段:
class MetadataAggregator(implicit config: Config) extends Actor with ActorLogging {
import context.system
import akka.pattern.pipe
implicit val exec = context.system.dispatcher
val url = sys.env.getOrElse("URL", config.getString("conf.url"))
override def receive: Receive = {
case MetadataEvent(event, phase, topic) =>
val payload = captureMetadata(event, phase, topic)
publishMetadata(payload)
case _ => //Do nothing
}
def publishMetadata(payload: String) = {
log.info(s"Metadata payload : $payload")
val responseFuture = Http().singleRequest(
HttpRequest(
HttpMethods.POST,
uri = url,
entity = HttpEntity(
ContentTypes.`application/json`,
payload
)
)
)
responseFuture.pipeTo(self)
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case ex: RuntimeException =>
log.error(s"${self.path} incurred an exception. $ex...")
ex.printStackTrace()
log.info("Resuming...")
Resume
case any: Any =>
log.error(s"${self.path} stopping due to $any ...")
any.printStackTrace()
Stop
}
}
Bdw,我不能使用 akka 类型的 actor,因为整个项目都在使用无类型的 actor。
pipeTo
调用将 HttpResponse
发送给 actor,因此您需要在 receive
方法中处理它。但我建议创建一条包含有效负载和响应的新消息,并将其发送到 self
。这允许您描述导致响应的负载。
HttpResponse
被 case _ =>
捕获并被忽略,因此通常最好记录任何意外消息,以便更早地捕获此类事件。
示例代码:
为结果创建一个新的class:
case class PublishResult(payload: String, result: Try[HttpResponse])
在publishMetadata
中:
val responseFuture = Http().singleRequest(???)
responseFuture.onComplete{ res =>
self ! PublishResult(payload, res)
}
在 receive
中添加此处理程序:
case PublishResult(payload, res) =>
res match {
case Failure(e) =>
log.warn("Request payload {} failed: {}", payload, e.getMessage)
case Success(response) =>
log.debug("Request succeeded")
}
在我的项目中,我必须编写一个休息客户端,它将接收来自休息服务的 HttpResponse 作为未来。我想要的是记录响应的状态代码,如果有任何异常,也记录该异常。我怎样才能使用管道模式实现这一点。 PFB 我的代码片段:
class MetadataAggregator(implicit config: Config) extends Actor with ActorLogging {
import context.system
import akka.pattern.pipe
implicit val exec = context.system.dispatcher
val url = sys.env.getOrElse("URL", config.getString("conf.url"))
override def receive: Receive = {
case MetadataEvent(event, phase, topic) =>
val payload = captureMetadata(event, phase, topic)
publishMetadata(payload)
case _ => //Do nothing
}
def publishMetadata(payload: String) = {
log.info(s"Metadata payload : $payload")
val responseFuture = Http().singleRequest(
HttpRequest(
HttpMethods.POST,
uri = url,
entity = HttpEntity(
ContentTypes.`application/json`,
payload
)
)
)
responseFuture.pipeTo(self)
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case ex: RuntimeException =>
log.error(s"${self.path} incurred an exception. $ex...")
ex.printStackTrace()
log.info("Resuming...")
Resume
case any: Any =>
log.error(s"${self.path} stopping due to $any ...")
any.printStackTrace()
Stop
}
}
Bdw,我不能使用 akka 类型的 actor,因为整个项目都在使用无类型的 actor。
pipeTo
调用将 HttpResponse
发送给 actor,因此您需要在 receive
方法中处理它。但我建议创建一条包含有效负载和响应的新消息,并将其发送到 self
。这允许您描述导致响应的负载。
HttpResponse
被 case _ =>
捕获并被忽略,因此通常最好记录任何意外消息,以便更早地捕获此类事件。
示例代码:
为结果创建一个新的class:
case class PublishResult(payload: String, result: Try[HttpResponse])
在publishMetadata
中:
val responseFuture = Http().singleRequest(???)
responseFuture.onComplete{ res =>
self ! PublishResult(payload, res)
}
在 receive
中添加此处理程序:
case PublishResult(payload, res) =>
res match {
case Failure(e) =>
log.warn("Request payload {} failed: {}", payload, e.getMessage)
case Success(response) =>
log.debug("Request succeeded")
}