如果另一个未来失败了,如何取消未来的行动?

How to cancel a future action if another future did failed?

我有 2 个期货(数据库表上的 2 个操作),我希望在保存修改之前检查两个期货是否已成功完成。

现在,我在第一个里面开始第二个未来(作为依赖),但我知道这不是最好的选择。我知道我可以使用 for-comprehension 并行执行两个 futures,但即使一个失败,另一个也会被执行(尚未测试)

firstFuture.dropColumn(tableName) match {
  case Success(_) => secondFuture.deleteEntity(entity)
  case Failure(e) => throw new Exception(e.getMessage)
}

// the  first future alters a table, drops a column
// the second future deletes a row from another table

在这种情况下,如果第一个 future 成功执行,第二个可能会失败。我想恢复第一个未来的更新。我听说 SQL 交易,好像是这样的,但是怎么办?

val futuresResult = for {
  first <- firstFuture.dropColumn(tableName)
  second <- secondFuture.deleteEntity(entity)
} yield (first, second)

A for-comprehension 在我的情况下要好得多,因为我在这两个 futures 之间没有依赖关系并且可以并行执行,但这并不能解决我的问题,结果可以是(成功, 成功) 或 (失败, 成功) 例如。

关于 Future 运行 顺序 vs 并行:

这有点棘手,因为 Scala Future 被设计为 eager。在处理同步和异步效果的各种 Scala 库中还有一些其他构造,例如 cat IO、Monix TaskZIO 等,它们是在 lazy 中设计的 方式,他们没有这种行为。

Future 急切的一点是它会尽快开始计算。这里 "start" 表示将其安排在显式选择或隐式存在的 ExecutionContext 上。虽然从技术上讲,如果调度程序决定这样做,执行可能会稍微停顿一下,但它很可能几乎会立即启动。

因此,如果您有一个 Future 类型的值,它将从 运行ning 开始。如果你有一个 Future 类型的惰性值,或者 return 是 Future 类型值的函数/方法,那么它不是。

但是即使你只有简单的值(没有惰性 vals 或 defs),如果 Future 定义是在 for-comprehension 内部完成的,那么这意味着它是 monadic flatMap 链的一部分(如果你不明白这个,暂时忽略它)它会运行顺序,而不是并行。为什么?这不是 Futures 特有的;每个 for-comprehension 都具有作为顺序链的语义,您可以在其中将上一步的结果传递到下一步。因此,如果它依赖于步骤 n 中的某些内容,那么您不能 运行 步骤 n + 1 中的某些内容是合乎逻辑的。 =55=]

这里有一些代码可以对此进行演示。

val program = for {
  _ <- Future { Thread.sleep(5000); println("f1") }
  _ <- Future { Thread.sleep(5000); println("f2") }
} yield ()

Await.result(program, Duration.Inf)

此程序将等待五秒钟,然后打印 "f1",然后再等待五秒钟,然后打印 "f2".

现在让我们来看看这个:

val f1 = Future { Thread.sleep(5000); println("f1") }
val f2 = Future { Thread.sleep(5000); println("f2") }

val program = for {
  _ <- f1
  _ <- f2
} yield ()

Await.result(program, Duration.Inf)

然而,该程序将在五秒后同时打印 "f1" 和 "f2"。

请注意,在第二种情况下并没有真正违反序列语义。 f2还有机会使用f1的结果。但是 f2 没有使用 f1 的结果;它是一个可以立即计算的独立值(用 val 定义)。因此,如果我们将 val f2 更改为函数,例如def f2(number: Int),则执行变:

val f1 = Future { Thread.sleep(5000); println("f1"); 42 }
def f2(number: Int) = Future { Thread.sleep(5000); println(number) }

val program = for {
  number <- f1
  _ <- f2(number)
} yield ()

如您所料,这将在五秒后打印 "f1",然后另一个 Future 才会开始,因此它会在再过五秒后打印“42”。

关于交易:

正如@cbley 在评论中提到的,这听起来像是您想要数据库事务。例如,在 SQL 数据库中,这有一个非常 specific meaning and it ensures the ACID properties

如果那是你需要的,你需要在数据库层解决它。 Future 太笼统了;它只是一种模拟同步和异步计算的效果类型。当您看到 Future 值时,仅通过查看类型,您无法判断它是数据库调用的结果还是某些 HTTP 调用的结果。

例如,doobie 将每个数据库查询描述为 ConnectionIO 类型。您可以在一个 for-comprehension 中排列多个查询,就像使用 Future:

一样
val program = for {
  a <- database.getA()
  _ <- database.write("foo")
  b <- database.getB()
} yield {
  // use a and b
}

但与我们之前的示例不同,这里 getA()getB() 不是 return 类型 Future[A] 的值,而是 ConnectionIO[A]。最酷的是 doobie 完全处理了这样一个事实,即您可能希望这些查询在单个事务中 运行,因此如果 getB() 失败,"foo" 将不会被提交数据库。

所以在这种情况下你要做的是获取查询集的完整描述,将其包装成 ConnectionIO 类型的单个值 program,一旦你想 运行 交易,你会做类似 program.transact(myTransactor) 的事情,其中​​ myTransactorTransactor 的一个实例,一个 doobie construct 知道如何连接到你的物理数据库。

一旦您进行交易,您的 ConnectionIO[A] 就会变成 Future[A]。如果事务失败,您将失败 Future,并且不会真正向您的数据库提交任何内容。

如果您的数据库操作相互独立并且可以运行并行,doobie 也可以让您做到这一点。通过 doobie 提交事务,无论是顺序还是并行,都很好地解释了 in the docs