在 Observable 消耗完成之前,如何阻止终止我的程序?
How can I block terminating my program until the Observable consumption is complete?
我目前正在尝试使用 Monix 来限制 api get 请求。我试过使用 STTP 的 Monix 后端,它工作正常,直到我完成后无法关闭 Monix 后端......因为这看起来更像是一个 sttp 问题而不是 Monix 问题,我试图重新解决这个问题使用 sttp 的默认后端,同时仍然使用 Monix 进行节流。
我主要是在使用完可观察对象后关闭 monix 后端
我试图通过以下方式简化问题:
import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val obs: Observable[CancelableFuture[Int]] = Observable
.fromIterable(someIter)
.throttle(3.second, 1)
.map(_.runToFuture)
但是,我仍然不确定如何在使用 Observable 后关闭程序,因为它在这里过早终止(与 monix 后端情况不同)...
换句话说,如何在 Observable 迭代完成之前阻止程序终止?
您可以创建 Promise
,当 Observable
由 .doOnComplete
完成时完成
并在主线程中等待它。
import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val promise = Promise()
val obs: Observable[CancelableFuture[Int]] = Observable.fromIterable(someIter).throttle(3.second, 1)
.map(_.runToFuture)
.doOnComplete(Task { promise.complete(Success()) })
Await.ready(promise.future, Duration.Inf)
除了 Artem, and with insights from the Monix Gitter community 接受的答案之外,另一个可能的实现可能是:
val someIter = List(Task(1), Task(2))
val obs =
Observable
.fromIterable(someIter)
.throttle(1 second, 10)
.mapParallelUnordered(10)(x => x.map(x => x.send().body)) // Here we send requests
.sumL // Sum just as an example
.runSyncUnsafe()
我目前正在尝试使用 Monix 来限制 api get 请求。我试过使用 STTP 的 Monix 后端,它工作正常,直到我完成后无法关闭 Monix 后端......因为这看起来更像是一个 sttp 问题而不是 Monix 问题,我试图重新解决这个问题使用 sttp 的默认后端,同时仍然使用 Monix 进行节流。
我主要是在使用完可观察对象后关闭 monix 后端
我试图通过以下方式简化问题:
import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val obs: Observable[CancelableFuture[Int]] = Observable
.fromIterable(someIter)
.throttle(3.second, 1)
.map(_.runToFuture)
但是,我仍然不确定如何在使用 Observable 后关闭程序,因为它在这里过早终止(与 monix 后端情况不同)...
换句话说,如何在 Observable 迭代完成之前阻止程序终止?
您可以创建 Promise
,当 Observable
由 .doOnComplete
并在主线程中等待它。
import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val promise = Promise()
val obs: Observable[CancelableFuture[Int]] = Observable.fromIterable(someIter).throttle(3.second, 1)
.map(_.runToFuture)
.doOnComplete(Task { promise.complete(Success()) })
Await.ready(promise.future, Duration.Inf)
除了 Artem, and with insights from the Monix Gitter community 接受的答案之外,另一个可能的实现可能是:
val someIter = List(Task(1), Task(2))
val obs =
Observable
.fromIterable(someIter)
.throttle(1 second, 10)
.mapParallelUnordered(10)(x => x.map(x => x.send().body)) // Here we send requests
.sumL // Sum just as an example
.runSyncUnsafe()