Monix 任务处理期货列表失败

Monix task handle failure for list of futures

任务异步执行失败如何处理? IE。至少打印堆栈跟踪并关闭。下面的代码似乎永远等待输入 > 5

val things = Range(1, 40)
  implicit val scheduler = monix.execution.Scheduler.global
  def t(i:Int) = Task.eval {
      Try{
        Thread.sleep(1000)
        val result = i + 1
        if(result > 5){
          throw new Exception("asdf")
        }
        // i.e. write to file, that's why unit is returned
        println(result) // Effect
        "Result"
      }
    }
    val futures = things.map(e=> t(e))
  futures.foreach(_.runToFuture)

编辑

正在尝试:

futures.foreach(_.runToFuture.onComplete {
    case Success(value) =>
      println(value)
    case Failure(ex) =>
      System.err.println(ex)
      System.exit(1)
  })

不会停止计算。 如何记录堆栈跟踪并取消正在进行的计算并停止?

这个问题有两个部分:

  • 使任务可取消。
  • 当一个任务失败时取消兄弟姐妹。

使任务可取消

Monix 有 BooleanCancelable,它允许您在调用 cancel 时将 isCancelled 的结果设置为 true

cancel也需要在Thread.sleep为运行ning时调用Thread.interrupt唤醒。否则 sleep 将 运行 完成它的过程。但是,这将在您的任务中抛出 InterruptedException。这需要处理。

取消兄弟姐妹

CompositeCancelable。它似乎是 CompositeCancellable 的用例,它用于从父任务调用 cancel。因此,一旦构建了 CompositeCancellable(即构建了所有任务):

  • 必须为每个任务提供对此的引用,以便失败的任务可以对其调用 cancel。 (注意这是一种循环引用,最好避免)
  • 或者当子任务失败并调用 cancel 时,父任务(或代码)会知道。 (这样可以避免循环引用)

通知兄弟任务的另一种方法是使用 AtomicBoolean 并经常检查它(休眠 10 毫秒而不是 1000 毫秒)。当一个任务失败时,它会设置这个布尔值,以便其他任务可以停止执行。这当然不涉及Cancellable。 (这是一种 hack,最好使用 monix 调度程序)

备注

Task 中调用 Thread.sleep 是个好主意吗?我认为这会阻止另一个任务使用该线程。我认为使用调度程序来增加延迟并组合这些子任务是最有效地利用线程池的方法。

一种更惯用的方法是使用 Observable 而不是 Task 因为它正在处理数据列表(我假设这是用例,因为它在问题中显示) .

 val obs = Observable
  .fromIterable(Range(1, 40))
  .mapEval(i =>
    if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
    else Task.delay(println(i)) // Or write to file in your case
  )
  .completedL
  .runToFuture


obs
  .recover {
    case NonFatal(e) => println("Error")
  }

或者,您也可以使用 Either 指示错误,这样可以提高类型安全性,因为您需要处理 Either 结果。

val obs = Observable
  .fromIterable(Range(1, 40))
  .mapEval(i =>
    if (i + 1 > 5) Task.pure(Left("Error"))
    else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
  )
  .takeWhileInclusive(_.isRight) // will also emit the failing result
  .lastL
  .runToFuture


obs.map {
  case Left(err) => println("There's an error")
  case _ => println("Completed successfully")
}