如何限制通过 Monix 发送 HTTP get 请求?
How can I throttle sending HTTP get requests via Monix?
在我之前的 , and with insights from Artem 的基础上,我的 objective 是向给定的 url 发送获取请求,并使用 Monix 的节流功能 space 发出请求(以避免达到速率限制)。
预期的工作流程类似于:
make 1 (or more) api call(s) -> apply back-pressure/pausing (based on throttle) -> make the next request -> so on and so forth..
这是我迄今为止尝试过的方法(下面是我实际代码的简化片段):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
val flat: Unit = activities.runToFuture.foreach { x =>
val r: List[Task[Response[Either[String, String]]]] = x // List with size 3
Observable
.fromIterable(r)
.throttle(6 second, 1)
.map(_.runToFuture)
.subscribe()
}
while (true) {}
}
获取数据的函数如下所示:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
我已经尝试 运行 上述代码,但我仍然看到所有 get 请求都被触发,中间没有任何间隔。
举例来说,我当前的 api 通话记录类似于:
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
我正在努力实现类似于:
//(https://mr.foos.api/v1), Sat Aug 08 18:50:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:18 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:21 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:24 CEST 2020)
更新:
- 我已将 api 设置为可使用 Beeceptor 进行模拟。在我看来,打印语句是由调用函数发出的,但实际上并未发送请求。我还更新了我的函数调用以解析为字符串(只是为了简单起见)但是,当我尝试限制对模拟的请求时 api 它仍然没有收到任何请求。
所以如果我没理解错的话你有这样的类型:
object ObservableTest extends App {
type Response = Either[ResponseError[Error], Activities]
case class Activities()
val activities: Task[List[Response]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
def fetchData(uri: String): Task[Response] = ???
val ids: Task[List[Int]] = ??? // a Task containing a List of IDs (from a previous step)
val func: String => Task[Response] = (i: String) => fetchData("someUri") // a function that will be used to call an endpoint
val data: Task[List[Task[Response]]] = ids map (_ map (id => func(id.toString))) // Maps API calling-function to the ids
val activitiesData: Task[List[Response]] = data.flatMap(Task.parSequenceUnordered(_)) // Flattenned the previous step
activitiesData.guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
Observable(activities)
.throttle(3 second, 1)
.subscribe()
}
你的代码中的问题是你限制了一个包含多个动作的大任务,其中一些甚至是并行的(但这不是问题的根源)。
即使在类型中,您也可以看到 - 您应该从任务列表(每个任务都会受到限制)而不是列表的任务中进行观察。
我实际上不知道 id 从何而来,它可以作为评估管道的基石。但是如果我们像示例中那样与他们一起完成任务。
我们会这样做的。
import monix.eval.Task
import sttp.client.asynchttpclient.monix._
import monix.eval.Task._
import monix.reactive.Observable
import sttp.client.ResponseError
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
type Response = Either[ResponseError[Error], Activity]
case class Activity()
val activities: Task[List[Task[Response]]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
def fetchData(uri: String): Task[Response] = Task {
println("mocked http request")
Right(Activity())
}
val ids: Task[List[Int]] = Task { (1 to 100).toList} // a Task containing a List of IDs (from a previous step)
val func: Int => Task[Response] = (i: Int) => fetchData(s"someUri_$i") // a function that will be used to call an endpoint
val data: Task[List[Task[Response]]] = ids.map(_.map(func)) // Maps API calling-function to the ids
data.guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
Observable.fromTask(activities)
.flatMap { listOfFetches: List[Task[Response]] =>
Observable.fromIterable(listOfFetches)
}
.throttle(3.second, 1)
.map(_.runToFuture)
.subscribe()
while(true) {}
}
我们限制了一个提取列表,而不是在其中执行所有提取的任务。
PS: 有什么不清楚的请提问,我会在代码中添加注释
在我之前的
预期的工作流程类似于:
make 1 (or more) api call(s) -> apply back-pressure/pausing (based on throttle) -> make the next request -> so on and so forth..
这是我迄今为止尝试过的方法(下面是我实际代码的简化片段):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
val flat: Unit = activities.runToFuture.foreach { x =>
val r: List[Task[Response[Either[String, String]]]] = x // List with size 3
Observable
.fromIterable(r)
.throttle(6 second, 1)
.map(_.runToFuture)
.subscribe()
}
while (true) {}
}
获取数据的函数如下所示:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
我已经尝试 运行 上述代码,但我仍然看到所有 get 请求都被触发,中间没有任何间隔。
举例来说,我当前的 api 通话记录类似于:
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:47:15 CEST 2020)
我正在努力实现类似于:
//(https://mr.foos.api/v1), Sat Aug 08 18:50:15 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:18 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:21 CEST 2020)
//(https://mr.foos.api/v1), Sat Aug 08 18:50:24 CEST 2020)
更新:
- 我已将 api 设置为可使用 Beeceptor 进行模拟。在我看来,打印语句是由调用函数发出的,但实际上并未发送请求。我还更新了我的函数调用以解析为字符串(只是为了简单起见)但是,当我尝试限制对模拟的请求时 api 它仍然没有收到任何请求。
所以如果我没理解错的话你有这样的类型:
object ObservableTest extends App {
type Response = Either[ResponseError[Error], Activities]
case class Activities()
val activities: Task[List[Response]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
def fetchData(uri: String): Task[Response] = ???
val ids: Task[List[Int]] = ??? // a Task containing a List of IDs (from a previous step)
val func: String => Task[Response] = (i: String) => fetchData("someUri") // a function that will be used to call an endpoint
val data: Task[List[Task[Response]]] = ids map (_ map (id => func(id.toString))) // Maps API calling-function to the ids
val activitiesData: Task[List[Response]] = data.flatMap(Task.parSequenceUnordered(_)) // Flattenned the previous step
activitiesData.guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
Observable(activities)
.throttle(3 second, 1)
.subscribe()
}
你的代码中的问题是你限制了一个包含多个动作的大任务,其中一些甚至是并行的(但这不是问题的根源)。 即使在类型中,您也可以看到 - 您应该从任务列表(每个任务都会受到限制)而不是列表的任务中进行观察。
我实际上不知道 id 从何而来,它可以作为评估管道的基石。但是如果我们像示例中那样与他们一起完成任务。 我们会这样做的。
import monix.eval.Task
import sttp.client.asynchttpclient.monix._
import monix.eval.Task._
import monix.reactive.Observable
import sttp.client.ResponseError
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
type Response = Either[ResponseError[Error], Activity]
case class Activity()
val activities: Task[List[Task[Response]]] = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
def fetchData(uri: String): Task[Response] = Task {
println("mocked http request")
Right(Activity())
}
val ids: Task[List[Int]] = Task { (1 to 100).toList} // a Task containing a List of IDs (from a previous step)
val func: Int => Task[Response] = (i: Int) => fetchData(s"someUri_$i") // a function that will be used to call an endpoint
val data: Task[List[Task[Response]]] = ids.map(_.map(func)) // Maps API calling-function to the ids
data.guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
Observable.fromTask(activities)
.flatMap { listOfFetches: List[Task[Response]] =>
Observable.fromIterable(listOfFetches)
}
.throttle(3.second, 1)
.map(_.runToFuture)
.subscribe()
while(true) {}
}
我们限制了一个提取列表,而不是在其中执行所有提取的任务。
PS: 有什么不清楚的请提问,我会在代码中添加注释