Monix 任务处理期货列表失败
Monix task handle failure for list of futures
任务异步执行失败如何处理? IE。至少打印堆栈跟踪并关闭。下面的代码似乎永远等待输入 > 5
val things = Range(1, 40)
implicit val scheduler = monix.execution.Scheduler.global
def t(i:Int) = Task.eval {
Try{
Thread.sleep(1000)
val result = i + 1
if(result > 5){
throw new Exception("asdf")
}
// i.e. write to file, that's why unit is returned
println(result) // Effect
"Result"
}
}
val futures = things.map(e=> t(e))
futures.foreach(_.runToFuture)
编辑
正在尝试:
futures.foreach(_.runToFuture.onComplete {
case Success(value) =>
println(value)
case Failure(ex) =>
System.err.println(ex)
System.exit(1)
})
不会停止计算。
如何记录堆栈跟踪并取消正在进行的计算并停止?
这个问题有两个部分:
- 使任务可取消。
- 当一个任务失败时取消兄弟姐妹。
使任务可取消
Monix 有 BooleanCancelable,它允许您在调用 cancel
时将 isCancelled
的结果设置为 true
。
cancel
也需要在Thread.sleep
为运行ning时调用Thread.interrupt
唤醒。否则 sleep
将 运行 完成它的过程。但是,这将在您的任务中抛出 InterruptedException。这需要处理。
取消兄弟姐妹
有CompositeCancelable。它似乎是 CompositeCancellable
的用例,它用于从父任务调用 cancel
。因此,一旦构建了 CompositeCancellable
(即构建了所有任务):
- 必须为每个任务提供对此的引用,以便失败的任务可以对其调用
cancel
。 (注意这是一种循环引用,最好避免)
- 或者当子任务失败并调用
cancel
时,父任务(或代码)会知道。 (这样可以避免循环引用)
通知兄弟任务的另一种方法是使用 AtomicBoolean 并经常检查它(休眠 10 毫秒而不是 1000 毫秒)。当一个任务失败时,它会设置这个布尔值,以便其他任务可以停止执行。这当然不涉及Cancellable
。 (这是一种 hack,最好使用 monix 调度程序)
备注
在 Task
中调用 Thread.sleep
是个好主意吗?我认为这会阻止另一个任务使用该线程。我认为使用调度程序来增加延迟并组合这些子任务是最有效地利用线程池的方法。
一种更惯用的方法是使用 Observable
而不是 Task
因为它正在处理数据列表(我假设这是用例,因为它在问题中显示) .
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
else Task.delay(println(i)) // Or write to file in your case
)
.completedL
.runToFuture
obs
.recover {
case NonFatal(e) => println("Error")
}
或者,您也可以使用 Either
指示错误,这样可以提高类型安全性,因为您需要处理 Either
结果。
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.pure(Left("Error"))
else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
)
.takeWhileInclusive(_.isRight) // will also emit the failing result
.lastL
.runToFuture
obs.map {
case Left(err) => println("There's an error")
case _ => println("Completed successfully")
}
任务异步执行失败如何处理? IE。至少打印堆栈跟踪并关闭。下面的代码似乎永远等待输入 > 5
val things = Range(1, 40)
implicit val scheduler = monix.execution.Scheduler.global
def t(i:Int) = Task.eval {
Try{
Thread.sleep(1000)
val result = i + 1
if(result > 5){
throw new Exception("asdf")
}
// i.e. write to file, that's why unit is returned
println(result) // Effect
"Result"
}
}
val futures = things.map(e=> t(e))
futures.foreach(_.runToFuture)
编辑
正在尝试:
futures.foreach(_.runToFuture.onComplete {
case Success(value) =>
println(value)
case Failure(ex) =>
System.err.println(ex)
System.exit(1)
})
不会停止计算。 如何记录堆栈跟踪并取消正在进行的计算并停止?
这个问题有两个部分:
- 使任务可取消。
- 当一个任务失败时取消兄弟姐妹。
使任务可取消
Monix 有 BooleanCancelable,它允许您在调用 cancel
时将 isCancelled
的结果设置为 true
。
cancel
也需要在Thread.sleep
为运行ning时调用Thread.interrupt
唤醒。否则 sleep
将 运行 完成它的过程。但是,这将在您的任务中抛出 InterruptedException。这需要处理。
取消兄弟姐妹
有CompositeCancelable。它似乎是 CompositeCancellable
的用例,它用于从父任务调用 cancel
。因此,一旦构建了 CompositeCancellable
(即构建了所有任务):
- 必须为每个任务提供对此的引用,以便失败的任务可以对其调用
cancel
。 (注意这是一种循环引用,最好避免) - 或者当子任务失败并调用
cancel
时,父任务(或代码)会知道。 (这样可以避免循环引用)
通知兄弟任务的另一种方法是使用 AtomicBoolean 并经常检查它(休眠 10 毫秒而不是 1000 毫秒)。当一个任务失败时,它会设置这个布尔值,以便其他任务可以停止执行。这当然不涉及Cancellable
。 (这是一种 hack,最好使用 monix 调度程序)
备注
在 Task
中调用 Thread.sleep
是个好主意吗?我认为这会阻止另一个任务使用该线程。我认为使用调度程序来增加延迟并组合这些子任务是最有效地利用线程池的方法。
一种更惯用的方法是使用 Observable
而不是 Task
因为它正在处理数据列表(我假设这是用例,因为它在问题中显示) .
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
else Task.delay(println(i)) // Or write to file in your case
)
.completedL
.runToFuture
obs
.recover {
case NonFatal(e) => println("Error")
}
或者,您也可以使用 Either
指示错误,这样可以提高类型安全性,因为您需要处理 Either
结果。
val obs = Observable
.fromIterable(Range(1, 40))
.mapEval(i =>
if (i + 1 > 5) Task.pure(Left("Error"))
else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
)
.takeWhileInclusive(_.isRight) // will also emit the failing result
.lastL
.runToFuture
obs.map {
case Left(err) => println("There's an error")
case _ => println("Completed successfully")
}