如何使用管道模式处理未类型化 Actor 中的 Future HttpResponse

How to handle a Future HttpResponse within an untyped Actor using pipe pattern

在我的项目中,我必须编写一个休息客户端,它将接收来自休息服务的 HttpResponse 作为未来。我想要的是记录响应的状态代码,如果有任何异常,也记录该异常。我怎样才能使用管道模式实现这一点。 PFB 我的代码片段:

class MetadataAggregator(implicit config: Config) extends Actor with ActorLogging {

  import context.system
  import akka.pattern.pipe

  implicit val exec = context.system.dispatcher

  val url = sys.env.getOrElse("URL", config.getString("conf.url"))

  override def receive: Receive = {
    case MetadataEvent(event, phase, topic) =>
      val payload = captureMetadata(event, phase, topic)
      publishMetadata(payload)
    case _ => //Do nothing
  }

  
  def publishMetadata(payload: String) = {
    log.info(s"Metadata payload : $payload")
    val responseFuture = Http().singleRequest(
      HttpRequest(
        HttpMethods.POST,
        uri = url,
        entity = HttpEntity(
          ContentTypes.`application/json`,
          payload
        )
      )
    )
  
    responseFuture.pipeTo(self)
  
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case ex: RuntimeException =>
      log.error(s"${self.path} incurred an exception. $ex...")
      ex.printStackTrace()
      log.info("Resuming...")
      Resume
    case any: Any =>
      log.error(s"${self.path} stopping due to $any ...")
      any.printStackTrace()
      Stop
  }
}

Bdw,我不能使用 akka 类型的 actor,因为整个项目都在使用无类型的 actor。

pipeTo 调用将 HttpResponse 发送给 actor,因此您需要在 receive 方法中处理它。但我建议创建一条包含有效负载和响应的新消息,并将其发送到 self。这允许您描述导致响应的负载。

HttpResponsecase _ => 捕获并被忽略,因此通常最好记录任何意外消息,以便更早地捕获此类事件。


示例代码:

为结果创建一个新的class:

 case class PublishResult(payload: String, result: Try[HttpResponse])

publishMetadata中:

 val responseFuture = Http().singleRequest(???)

 responseFuture.onComplete{ res =>
   self ! PublishResult(payload, res)
 }
 

receive 中添加此处理程序:

 case PublishResult(payload, res) =>
   res match {
     case Failure(e) =>
       log.warn("Request payload {} failed: {}", payload, e.getMessage)
     case Success(response) =>
       log.debug("Request succeeded")
   }