完成请求后如何关闭 STTP 后端?

How can I close the STTP backend after completing my requests?

我目前正在学习和尝试使用 Monix 后端的 STTP。我主要是在处理完所有请求(每个请求都是一个任务)后关闭后端。

我创建了 sample/mock 代码来解决我的问题(据我所知,我的问题更普遍,而不是特定于我的代码):

import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}

import scala.concurrent.duration.DurationInt

object ObservableTest extends App {

  val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val ids: Task[List[Int]] = Task { (1 to 3).toList }
    val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
    val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
    // I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
  }
  import monix.execution.Scheduler.Implicits.global
  val obs = Observable
    .fromTask(activities)
    .flatMap { listOfFetches =>
      Observable.fromIterable(listOfFetches)
    }
    .throttle(3 second, 1)
    .map(_.runToFuture)

  obs.subscribe()
}

我的 fetch (api call maker) 函数如下所示:

  def fetch(uri: Uri, auth: String)(implicit
      backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
  ) = {
    println(uri)
    val task = basicRequest
      .get(uri)
      .header("accept", "application/json")
      .header("Authorization", auth)
      .response(asString)
      .send()

    task
  }

由于我的主要任务包含我稍后需要处理的其他任务,因此我需要找到一种从外部关闭 Monix 后端的替代方法。在我使用 List[Task[Response[Either[String, String]]]] 中的请求后,是否有一种干净的方法来关闭后端?

问题来自这样一个事实,即在打开 sttp 后端的情况下,您正在计算要执行的任务列表 - List[Task[Response[Either[String, String]]]],但您没有 运行 宁他们。因此,我们需要在后端关闭之前对这些任务进行排序 运行。

这里要做的关键是创建任务的单一描述,即 运行 在后端仍然打开时所有这些请求。

一旦你计算 data(它本身是一个任务 - 计算的描述 - 当 运行 时,产生任务列表 - 也是计算的描述),我们需要将其转换为单个 non-nested Task。这可以通过多种方式完成(例如使用简单的排序),但在您的情况下,这将使用 Observable:

AsyncHttpClientMonixBackend().flatMap { implicit backend =>
  val ids: Task[List[Int]] = Task { (1 to 3).toList }
  val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
  val data: Task[List[Task[Response[Either[String, String]]]]] =
    ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))

  val activities = Observable
    .fromTask(data)
    .flatMap { listOfFetches =>
      Observable.fromIterable(listOfFetches)
    }
    .throttle(3 second, 1)
    .mapEval(identity)
    .completedL

  activities.guarantee(
    backend.close()
  )
}

首先请注意 Observable.fromTask(...) 在最外面的 flatMap 里面,所以是在后端仍然打开时创建的。我们创建可观察对象,限制它等,然后是关键事实:一旦我们有了限制流,我们评估每个项目(每个项目都是一个 Task[...] - 如何发送 na http 请求的描述)使用mapEval。我们得到了 Either[String, String] 的流,这是请求的结果。

最后,我们使用 .completedL(丢弃结果)将流转换为 Task,等待整个流完成。

这个最后的任务然后通过关闭后端来排序。如上所述,将发生的 side-effects 的最终序列是:

  1. 打开后台
  2. 创建任务列表(data
  3. 创建一个流,它会限制由 data
  4. 计算的列表中的元素
  5. 评估流中的每个项目(发送请求)
  6. 关闭后台