完成请求后如何关闭 STTP 后端?
How can I close the STTP backend after completing my requests?
我目前正在学习和尝试使用 Monix 后端的 STTP。我主要是在处理完所有请求(每个请求都是一个任务)后关闭后端。
我创建了 sample/mock 代码来解决我的问题(据我所知,我的问题更普遍,而不是特定于我的代码):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
// I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
}
import monix.execution.Scheduler.Implicits.global
val obs = Observable
.fromTask(activities)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.map(_.runToFuture)
obs.subscribe()
}
我的 fetch (api call maker) 函数如下所示:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
由于我的主要任务包含我稍后需要处理的其他任务,因此我需要找到一种从外部关闭 Monix 后端的替代方法。在我使用 List[Task[Response[Either[String, String]]]]
中的请求后,是否有一种干净的方法来关闭后端?
问题来自这样一个事实,即在打开 sttp 后端的情况下,您正在计算要执行的任务列表 - List[Task[Response[Either[String, String]]]]
,但您没有 运行 宁他们。因此,我们需要在后端关闭之前对这些任务进行排序 运行。
这里要做的关键是创建任务的单一描述,即 运行 在后端仍然打开时所有这些请求。
一旦你计算 data
(它本身是一个任务 - 计算的描述 - 当 运行 时,产生任务列表 - 也是计算的描述),我们需要将其转换为单个 non-nested Task
。这可以通过多种方式完成(例如使用简单的排序),但在您的情况下,这将使用 Observable
:
AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] =
ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
val activities = Observable
.fromTask(data)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.mapEval(identity)
.completedL
activities.guarantee(
backend.close()
)
}
首先请注意 Observable.fromTask(...)
在最外面的 flatMap
里面,所以是在后端仍然打开时创建的。我们创建可观察对象,限制它等,然后是关键事实:一旦我们有了限制流,我们评估每个项目(每个项目都是一个 Task[...]
- 如何发送 na http 请求的描述)使用mapEval
。我们得到了 Either[String, String]
的流,这是请求的结果。
最后,我们使用 .completedL
(丢弃结果)将流转换为 Task
,等待整个流完成。
这个最后的任务然后通过关闭后端来排序。如上所述,将发生的 side-effects 的最终序列是:
- 打开后台
- 创建任务列表(
data
)
- 创建一个流,它会限制由
data
计算的列表中的元素
- 评估流中的每个项目(发送请求)
- 关闭后台
我目前正在学习和尝试使用 Monix 后端的 STTP。我主要是在处理完所有请求(每个请求都是一个任务)后关闭后端。
我创建了 sample/mock 代码来解决我的问题(据我所知,我的问题更普遍,而不是特定于我的代码):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
// I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
}
import monix.execution.Scheduler.Implicits.global
val obs = Observable
.fromTask(activities)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.map(_.runToFuture)
obs.subscribe()
}
我的 fetch (api call maker) 函数如下所示:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
由于我的主要任务包含我稍后需要处理的其他任务,因此我需要找到一种从外部关闭 Monix 后端的替代方法。在我使用 List[Task[Response[Either[String, String]]]]
中的请求后,是否有一种干净的方法来关闭后端?
问题来自这样一个事实,即在打开 sttp 后端的情况下,您正在计算要执行的任务列表 - List[Task[Response[Either[String, String]]]]
,但您没有 运行 宁他们。因此,我们需要在后端关闭之前对这些任务进行排序 运行。
这里要做的关键是创建任务的单一描述,即 运行 在后端仍然打开时所有这些请求。
一旦你计算 data
(它本身是一个任务 - 计算的描述 - 当 运行 时,产生任务列表 - 也是计算的描述),我们需要将其转换为单个 non-nested Task
。这可以通过多种方式完成(例如使用简单的排序),但在您的情况下,这将使用 Observable
:
AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] =
ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
val activities = Observable
.fromTask(data)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.mapEval(identity)
.completedL
activities.guarantee(
backend.close()
)
}
首先请注意 Observable.fromTask(...)
在最外面的 flatMap
里面,所以是在后端仍然打开时创建的。我们创建可观察对象,限制它等,然后是关键事实:一旦我们有了限制流,我们评估每个项目(每个项目都是一个 Task[...]
- 如何发送 na http 请求的描述)使用mapEval
。我们得到了 Either[String, String]
的流,这是请求的结果。
最后,我们使用 .completedL
(丢弃结果)将流转换为 Task
,等待整个流完成。
这个最后的任务然后通过关闭后端来排序。如上所述,将发生的 side-effects 的最终序列是:
- 打开后台
- 创建任务列表(
data
) - 创建一个流,它会限制由
data
计算的列表中的元素
- 评估流中的每个项目(发送请求)
- 关闭后台