我如何运行 parSequenceUnordered of Monix,并处理每个任务的结果?
How can I run parSequenceUnordered of Monix, and handle the results of each task?
我目前正在致力于实现对 API 的客户端 http 请求,并决定为此任务探索 sttp 和 monix。由于我是 Monix 的新手,我仍然不确定如何 运行 任务并检索它们的结果。我的objective是有一系列http请求结果,我可以并行调用->解析->加载。
以下是我迄今为止尝试过的片段:
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val tasks = Seq(r1).map(i => Task(i))
Task.parSequenceUnordered(tasks).guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap52527361)
}
我的困惑很简单(我猜)。我如何 运行 我创建的 Task.parSequenceUnordered
,并处理(解析 http 结果)序列中的任务?
Nice to have: 出于好奇,是否可以在处理请求的Task序列时天真地引入rate-limiting/throttling?我并不是真的在寻找构建复杂的东西。它可以像分隔成批请求一样简单。想知道 Monix 是否已经有帮助者。
感谢 Oleg Pyzhcov and the monix gitter community 帮我解决了这个问题。
这里引用奥列格的话:
Since you're using backend with monix support already, the type of r1
is Task[Response[Either[String,String]]]
. So when you're doing
Seq(r1).map(i => Task(i))
, you make it a sequence of tasks that don't
do anything except give you other tasks that give you result (the type
would be Seq[Task[Task[Response[...]]]]
). Your code then parallelizes
the outer layer, tasks-that-give-tasks, and you get the tasks that you
started with as the result. You only need to process a Seq(r1) for it
to run requests in parallel.
If you're using Intellij, you can press Alt + =
to see the type of
selection - it helps if you can't tell the type from the code alone
(but it gets better with experience).
As for rate-limiting, we have parSequenceN that lets you set a limit
to parallelism. Note that unordered only means that you get slight
performance advantage at the cost of results being in random order in
the output, they are executed non-deterministically anyway.
我最终得到了一个(简化的)实现,看起来像这样:
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val items = Seq(r1.map(x => x.body))
Task.parSequenceN(1)(items).guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println)
}
我目前正在致力于实现对 API 的客户端 http 请求,并决定为此任务探索 sttp 和 monix。由于我是 Monix 的新手,我仍然不确定如何 运行 任务并检索它们的结果。我的objective是有一系列http请求结果,我可以并行调用->解析->加载。
以下是我迄今为止尝试过的片段:
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val tasks = Seq(r1).map(i => Task(i))
Task.parSequenceUnordered(tasks).guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap52527361)
}
我的困惑很简单(我猜)。我如何 运行 我创建的 Task.parSequenceUnordered
,并处理(解析 http 结果)序列中的任务?
Nice to have: 出于好奇,是否可以在处理请求的Task序列时天真地引入rate-limiting/throttling?我并不是真的在寻找构建复杂的东西。它可以像分隔成批请求一样简单。想知道 Monix 是否已经有帮助者。
感谢 Oleg Pyzhcov and the monix gitter community 帮我解决了这个问题。
这里引用奥列格的话:
Since you're using backend with monix support already, the type of r1 is
Task[Response[Either[String,String]]]
. So when you're doingSeq(r1).map(i => Task(i))
, you make it a sequence of tasks that don't do anything except give you other tasks that give you result (the type would beSeq[Task[Task[Response[...]]]]
). Your code then parallelizes the outer layer, tasks-that-give-tasks, and you get the tasks that you started with as the result. You only need to process a Seq(r1) for it to run requests in parallel.If you're using Intellij, you can press
Alt + =
to see the type of selection - it helps if you can't tell the type from the code alone (but it gets better with experience).As for rate-limiting, we have parSequenceN that lets you set a limit to parallelism. Note that unordered only means that you get slight performance advantage at the cost of results being in random order in the output, they are executed non-deterministically anyway.
我最终得到了一个(简化的)实现,看起来像这样:
import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val items = Seq(r1.map(x => x.body))
Task.parSequenceN(1)(items).guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println)
}