在 scala Future 中执行类似 DAG 的操作
execute DAG like operations in scala Future
我正在处理用例,其中我必须使用 scala Future 执行相互依赖的操作(定义为有向无环图)。基本上每个操作(比如 DAG 的节点)都将在 Future 中执行,并且一旦当前节点 Future 完成,后续的依赖节点将被触发(它们也应该在 Future 中)。这将一直持续到每个节点都完成处理或其中一个节点失败为止。到目前为止我有(最少的代码):
def run(node: Node, result: Result): Unit = {
val f: Future[(Node, Result)] = Future {
// process current Node
...
}
f onComplete {
case Success(x) =>
val n = x._1 // Current Node
val r = x._2 // Result of current Node
if (!n.isLeaf()) {
n.children.foreach { z =>
run(z, r)
}
}
case Failure(e) => throw e
}
}
这是解决这个问题的正确方法吗(在回调中调用另一个 Future)?同样,一旦其中一个节点处理失败,我没有正确的方法停止其他 运行 未来。
这可以使用 Future 组合来解决吗?如果是这样,我该如何实现?
谢谢,
普拉文
这是一个更实用的方法:我们可以使用通用类型,而不是使用 Unit
作为 run
/Future
的评估结果。通常您希望在功能上处理 Future
的结果,而不是其副作用。
我添加了类型注释和描述性变量名称,以便更容易理解。我还添加了一些案例来展示它是如何失败的。您还可以选择在发生故障时恢复而不是让一切都失败。但是,对于此问题,如果 child 计算依赖于 parent 值,则失败可能更合理。
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Try
case class Node[T](value: T, children: List[Node[T]])
object DagFuture extends App {
def run[A, B](node: Node[A], result: B)(nodeEval: (Node[A], B) => B)(aggregator: List[B] => B): Future[B] = {
val nodeResult: Future[B] = Future(nodeEval(node, result))
val allResults: Future[List[B]] = nodeResult.flatMap(r => Future.sequence(nodeResult :: node.children.map(x => run(x, r)(nodeEval)(aggregator))))
val finalResult: Future[B] = allResults.map(cl => aggregator(cl))
finalResult
}
val debugSum = (l: List[Int]) => {
println(s"aggregating: $l")
l.sum
}
def debugNodeEval(f: (Node[Int], Int) => Int)(n: Node[Int], r: Int) = {
val eval = Try { f(n, r) }
println(s"node: $n, result: $r, eval: $eval")
eval.get
}
val debugNodeEvalDefault = debugNodeEval((n, r) => n.value + r) _
val singleNodeDag = Node(1, Nil)
val multiNodeDag = Node(1, List(Node(20, Nil), Node(300, Nil)))
println("\nSINGLE NODE DAG EXAMPLE:")
val singleNodeFuture = run(singleNodeDag, 0)(debugNodeEvalDefault)(debugSum)
val singleNodeResult = Await.result(singleNodeFuture, 5 seconds)
println(s"Single node result: $singleNodeResult")
println("\nDAG PATH LENGTH EXAMPLE:")
val pathLengthFuture = run(multiNodeDag, 0)(debugNodeEvalDefault)(debugSum)
val pathLengthResult = Await.result(pathLengthFuture, 5 seconds)
println(s"Path length: $pathLengthResult")
println("\nFAILED DAG ROOT NODE EXAMPLE:")
val failedRootNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => throw new Exception))(debugSum)
val failedRootNodePromise = Await.ready(failedRootNodeFuture, 5 seconds)
println(s"Failed root node: ${failedRootNodePromise.value}")
println("\nFAILED DAG CHILD NODE EXAMPLE:")
val failedChildNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => if (n.value == 300) throw new Exception else n.value + r))(debugSum)
val failedChildNodePromise = Await.ready(failedChildNodeFuture, 5 seconds)
println(s"Failed child node: ${failedChildNodePromise.value}")
}
打印这个:
SINGLE NODE DAG EXAMPLE:
node: Node(1,List()), result: 0, eval: Success(1)
aggregating: List(1)
Single node result: 1
DAG PATH LENGTH EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1)
node: Node(20,List()), result: 1, eval: Success(21)
node: Node(300,List()), result: 1, eval: Success(301)
aggregating: List(301)
aggregating: List(21)
aggregating: List(1, 21, 301)
Path length: 323
FAILED DAG ROOT NODE EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Failure(java.lang.Exception)
Failed root node: Some(Failure(java.lang.Exception))
FAILED DAG CHILD NODE EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1)
node: Node(20,List()), result: 1, eval: Success(21)
aggregating: List(21)
node: Node(300,List()), result: 1, eval: Failure(java.lang.Exception)
Failed child node: Some(Failure(java.lang.Exception))
TL;DR
def run[A, B](node: Node[A], result: B)(nodeEval: (Node[A], B) => B)(aggregator: Traversable[B] => B): Future[B] = {
val nodeResult = Future(nodeEval(node, result))
val allResults = nodeResult flatMap { r => Future.sequence(nodeResult :: node.children.map { x => run(x, r)(nodeEval)(aggregator) }) }
allResults map aggregator
}
粗略地说,它只是一个 Future.flatMap(result => Future.sequence(children ...))
。当 parent Future
完成时,其结果将在 flatMap
中传递给 children 计算。如果 parent Future
失败,整个计算也会失败。 sequence
将 Future
列表中的结果合并为一个 Future
。 child Future
是 parent 到 children,以此类推。因此,同样的故障模式适用。
我正在处理用例,其中我必须使用 scala Future 执行相互依赖的操作(定义为有向无环图)。基本上每个操作(比如 DAG 的节点)都将在 Future 中执行,并且一旦当前节点 Future 完成,后续的依赖节点将被触发(它们也应该在 Future 中)。这将一直持续到每个节点都完成处理或其中一个节点失败为止。到目前为止我有(最少的代码):
def run(node: Node, result: Result): Unit = {
val f: Future[(Node, Result)] = Future {
// process current Node
...
}
f onComplete {
case Success(x) =>
val n = x._1 // Current Node
val r = x._2 // Result of current Node
if (!n.isLeaf()) {
n.children.foreach { z =>
run(z, r)
}
}
case Failure(e) => throw e
}
}
这是解决这个问题的正确方法吗(在回调中调用另一个 Future)?同样,一旦其中一个节点处理失败,我没有正确的方法停止其他 运行 未来。
这可以使用 Future 组合来解决吗?如果是这样,我该如何实现?
谢谢,
普拉文
这是一个更实用的方法:我们可以使用通用类型,而不是使用 Unit
作为 run
/Future
的评估结果。通常您希望在功能上处理 Future
的结果,而不是其副作用。
我添加了类型注释和描述性变量名称,以便更容易理解。我还添加了一些案例来展示它是如何失败的。您还可以选择在发生故障时恢复而不是让一切都失败。但是,对于此问题,如果 child 计算依赖于 parent 值,则失败可能更合理。
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Try
case class Node[T](value: T, children: List[Node[T]])
object DagFuture extends App {
def run[A, B](node: Node[A], result: B)(nodeEval: (Node[A], B) => B)(aggregator: List[B] => B): Future[B] = {
val nodeResult: Future[B] = Future(nodeEval(node, result))
val allResults: Future[List[B]] = nodeResult.flatMap(r => Future.sequence(nodeResult :: node.children.map(x => run(x, r)(nodeEval)(aggregator))))
val finalResult: Future[B] = allResults.map(cl => aggregator(cl))
finalResult
}
val debugSum = (l: List[Int]) => {
println(s"aggregating: $l")
l.sum
}
def debugNodeEval(f: (Node[Int], Int) => Int)(n: Node[Int], r: Int) = {
val eval = Try { f(n, r) }
println(s"node: $n, result: $r, eval: $eval")
eval.get
}
val debugNodeEvalDefault = debugNodeEval((n, r) => n.value + r) _
val singleNodeDag = Node(1, Nil)
val multiNodeDag = Node(1, List(Node(20, Nil), Node(300, Nil)))
println("\nSINGLE NODE DAG EXAMPLE:")
val singleNodeFuture = run(singleNodeDag, 0)(debugNodeEvalDefault)(debugSum)
val singleNodeResult = Await.result(singleNodeFuture, 5 seconds)
println(s"Single node result: $singleNodeResult")
println("\nDAG PATH LENGTH EXAMPLE:")
val pathLengthFuture = run(multiNodeDag, 0)(debugNodeEvalDefault)(debugSum)
val pathLengthResult = Await.result(pathLengthFuture, 5 seconds)
println(s"Path length: $pathLengthResult")
println("\nFAILED DAG ROOT NODE EXAMPLE:")
val failedRootNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => throw new Exception))(debugSum)
val failedRootNodePromise = Await.ready(failedRootNodeFuture, 5 seconds)
println(s"Failed root node: ${failedRootNodePromise.value}")
println("\nFAILED DAG CHILD NODE EXAMPLE:")
val failedChildNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => if (n.value == 300) throw new Exception else n.value + r))(debugSum)
val failedChildNodePromise = Await.ready(failedChildNodeFuture, 5 seconds)
println(s"Failed child node: ${failedChildNodePromise.value}")
}
打印这个:
SINGLE NODE DAG EXAMPLE:
node: Node(1,List()), result: 0, eval: Success(1)
aggregating: List(1)
Single node result: 1
DAG PATH LENGTH EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1)
node: Node(20,List()), result: 1, eval: Success(21)
node: Node(300,List()), result: 1, eval: Success(301)
aggregating: List(301)
aggregating: List(21)
aggregating: List(1, 21, 301)
Path length: 323
FAILED DAG ROOT NODE EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Failure(java.lang.Exception)
Failed root node: Some(Failure(java.lang.Exception))
FAILED DAG CHILD NODE EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1)
node: Node(20,List()), result: 1, eval: Success(21)
aggregating: List(21)
node: Node(300,List()), result: 1, eval: Failure(java.lang.Exception)
Failed child node: Some(Failure(java.lang.Exception))
TL;DR
def run[A, B](node: Node[A], result: B)(nodeEval: (Node[A], B) => B)(aggregator: Traversable[B] => B): Future[B] = {
val nodeResult = Future(nodeEval(node, result))
val allResults = nodeResult flatMap { r => Future.sequence(nodeResult :: node.children.map { x => run(x, r)(nodeEval)(aggregator) }) }
allResults map aggregator
}
粗略地说,它只是一个 Future.flatMap(result => Future.sequence(children ...))
。当 parent Future
完成时,其结果将在 flatMap
中传递给 children 计算。如果 parent Future
失败,整个计算也会失败。 sequence
将 Future
列表中的结果合并为一个 Future
。 child Future
是 parent 到 children,以此类推。因此,同样的故障模式适用。