在 Observable 消耗完成之前,如何阻止终止我的程序?

How can I block terminating my program until the Observable consumption is complete?

我目前正在尝试使用 Monix 来限制 api get 请求。我试过使用 STTP 的 Monix 后端,它工作正常,直到我完成后无法关闭 Monix 后端......因为这看起来更像是一个 sttp 问题而不是 Monix 问题,我试图重新解决这个问题使用 sttp 的默认后端,同时仍然使用 Monix 进行节流。

我主要是在使用完可观察对象后关闭 monix 后端

我试图通过以下方式简化问题:

  import monix.execution.Scheduler.Implicits.global

  val someIter = List(Task(1), Task(2))

  val obs: Observable[CancelableFuture[Int]] = Observable
    .fromIterable(someIter)
    .throttle(3.second, 1)
    .map(_.runToFuture)

但是,我仍然不确定如何在使用 Observable 后关闭程序,因为它在这里过早终止(与 monix 后端情况不同)...

换句话说,如何在 Observable 迭代完成之前阻止程序终止?

您可以创建 Promise,当 Observable.doOnComplete

完成时完成

并在主线程中等待它。

import monix.execution.Scheduler.Implicits.global

val someIter = List(Task(1), Task(2))
val promise = Promise()
val obs: Observable[CancelableFuture[Int]] = Observable.fromIterable(someIter).throttle(3.second, 1)
  .map(_.runToFuture)
  .doOnComplete(Task { promise.complete(Success()) })

Await.ready(promise.future, Duration.Inf)

除了 Artem, and with insights from the Monix Gitter community 接受的答案之外,另一个可能的实现可能是:

  val someIter = List(Task(1), Task(2))
  val obs =
    Observable
      .fromIterable(someIter)
      .throttle(1 second, 10)
      .mapParallelUnordered(10)(x => x.map(x => x.send().body)) // Here we send requests
      .sumL // Sum just as an example 
      .runSyncUnsafe()