Monix 并行任务的错误处理(使用 parMap)

Error Handling on Monix parallel tasks (using parMap)

我正在尝试使用 monix 来并行化某些操作,然后执行错误处理

假设我正在尝试解析和验证这样的几个对象

def parseAndValidateX(x: X) Task[X]

def parseAndValidateY(y: Y): Task[Y]

这里的X和Y是我定义的一些类型

现在,这些方法中的每一个都会评估一些标准,并且 return 是一个任务。如果评估失败,我有一些形式的代码

Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c"))

我对 Y 有类似的加薪任务。

Task.raiseError(new CustomException("Y is invalid because certain reasons a',b',c'"))

现在我有这个类型了

case class Combined(x: X, y: Y)

我定义了这个

private def parseAndValidateCombined(x: X, y: Y): Task[Combined] = {
  val xTask = parseAndValidateX(x)
  val yTask = parseAndValidateY(y)
     
  Task.parMap2(xTask, yTask) {
    (xEval, yEval) => SecretData(xEval, yTask)
  }
}

这应该允许我 运行 并行验证,果然我得到了响应。

不过我也想要这种行为

如果两个任务都失败了,我想return这样的错误

Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c and "Y is invalid because certain reasons a',b',c'"))

我好像做不到。根据两个任务中哪一个失败,我只能在 parMap2 输出的 onRecover 方法上获得两个失败之一。

万一两者都失败了,我只得到任务 X 的错误。

我是否有可能以完全异步的方式完成我在 Monix 上所做的事情(例如,也许其他一些方法可以将任务组合在一起)?或者我是否必须阻止 exec,单独获取错误并重新组合值?

只有parMap2,是不可能完成你想做的事情的。 documentation 表示:

In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.

但是,可以实现希望暴露 错误,而不是隐藏在 monadic 处理后面。这可以通过 materialize 方法实现。

因此,例如,您可以将您的方法实现为:

private def parseAndValidateCombined[X, Y](x: X, y: Y): Task[Combined] = {
  val xTask = parseAndValidate(x).materialize // turn on Task[Try[X]]
  val yTask = parseAndValidate(y).materialize // turn on Task[Try[Y]]
     
  Task.parMap2(xTask, yTask) {
    case (Success(xEval), Success(yEval)) => Success(SecretData(xEval, yEval))
    case (Failure(_), Failure(_)) => Failure[Combined](new CustomException(....))
    case (Failure(left), _) => Failure[Combined](left)
    case (_, Failure(right)) => Failure[Combined](right)
  }.dematerialize // turn to Task[Combined]
}