Monix 并行任务的错误处理(使用 parMap)
Error Handling on Monix parallel tasks (using parMap)
我正在尝试使用 monix 来并行化某些操作,然后执行错误处理
假设我正在尝试解析和验证这样的几个对象
def parseAndValidateX(x: X) Task[X]
和
def parseAndValidateY(y: Y): Task[Y]
这里的X和Y是我定义的一些类型
现在,这些方法中的每一个都会评估一些标准,并且 return 是一个任务。如果评估失败,我有一些形式的代码
Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c"))
我对 Y 有类似的加薪任务。
Task.raiseError(new CustomException("Y is invalid because certain reasons a',b',c'"))
现在我有这个类型了
case class Combined(x: X, y: Y)
我定义了这个
private def parseAndValidateCombined(x: X, y: Y): Task[Combined] = {
val xTask = parseAndValidateX(x)
val yTask = parseAndValidateY(y)
Task.parMap2(xTask, yTask) {
(xEval, yEval) => SecretData(xEval, yTask)
}
}
这应该允许我 运行 并行验证,果然我得到了响应。
不过我也想要这种行为
如果两个任务都失败了,我想return这样的错误
Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c and "Y is invalid because certain reasons a',b',c'"))
我好像做不到。根据两个任务中哪一个失败,我只能在 parMap2 输出的 onRecover 方法上获得两个失败之一。
万一两者都失败了,我只得到任务 X 的错误。
我是否有可能以完全异步的方式完成我在 Monix 上所做的事情(例如,也许其他一些方法可以将任务组合在一起)?或者我是否必须阻止 exec,单独获取错误并重新组合值?
只有parMap2
,是不可能完成你想做的事情的。
documentation 表示:
In case one of the tasks fails, then all other tasks get
cancelled and the final result will be a failure.
但是,可以实现希望暴露 错误,而不是隐藏在 monadic 处理后面。这可以通过 materialize
方法实现。
因此,例如,您可以将您的方法实现为:
private def parseAndValidateCombined[X, Y](x: X, y: Y): Task[Combined] = {
val xTask = parseAndValidate(x).materialize // turn on Task[Try[X]]
val yTask = parseAndValidate(y).materialize // turn on Task[Try[Y]]
Task.parMap2(xTask, yTask) {
case (Success(xEval), Success(yEval)) => Success(SecretData(xEval, yEval))
case (Failure(_), Failure(_)) => Failure[Combined](new CustomException(....))
case (Failure(left), _) => Failure[Combined](left)
case (_, Failure(right)) => Failure[Combined](right)
}.dematerialize // turn to Task[Combined]
}
我正在尝试使用 monix 来并行化某些操作,然后执行错误处理
假设我正在尝试解析和验证这样的几个对象
def parseAndValidateX(x: X) Task[X]
和
def parseAndValidateY(y: Y): Task[Y]
这里的X和Y是我定义的一些类型
现在,这些方法中的每一个都会评估一些标准,并且 return 是一个任务。如果评估失败,我有一些形式的代码
Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c"))
我对 Y 有类似的加薪任务。
Task.raiseError(new CustomException("Y is invalid because certain reasons a',b',c'"))
现在我有这个类型了
case class Combined(x: X, y: Y)
我定义了这个
private def parseAndValidateCombined(x: X, y: Y): Task[Combined] = {
val xTask = parseAndValidateX(x)
val yTask = parseAndValidateY(y)
Task.parMap2(xTask, yTask) {
(xEval, yEval) => SecretData(xEval, yTask)
}
}
这应该允许我 运行 并行验证,果然我得到了响应。
不过我也想要这种行为
如果两个任务都失败了,我想return这样的错误
Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c and "Y is invalid because certain reasons a',b',c'"))
我好像做不到。根据两个任务中哪一个失败,我只能在 parMap2 输出的 onRecover 方法上获得两个失败之一。
万一两者都失败了,我只得到任务 X 的错误。
我是否有可能以完全异步的方式完成我在 Monix 上所做的事情(例如,也许其他一些方法可以将任务组合在一起)?或者我是否必须阻止 exec,单独获取错误并重新组合值?
只有parMap2
,是不可能完成你想做的事情的。
documentation 表示:
In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.
但是,可以实现希望暴露 错误,而不是隐藏在 monadic 处理后面。这可以通过 materialize
方法实现。
因此,例如,您可以将您的方法实现为:
private def parseAndValidateCombined[X, Y](x: X, y: Y): Task[Combined] = {
val xTask = parseAndValidate(x).materialize // turn on Task[Try[X]]
val yTask = parseAndValidate(y).materialize // turn on Task[Try[Y]]
Task.parMap2(xTask, yTask) {
case (Success(xEval), Success(yEval)) => Success(SecretData(xEval, yEval))
case (Failure(_), Failure(_)) => Failure[Combined](new CustomException(....))
case (Failure(left), _) => Failure[Combined](left)
case (_, Failure(right)) => Failure[Combined](right)
}.dematerialize // turn to Task[Combined]
}