如何在 Scala 中使承诺失败

How to fail promise in Scala

在 Scala 文档中,有一个示例如何通过使用 promises 来选择更成功的 future。

http://docs.scala-lang.org/overviews/core/futures.html#promises

def first[T](f: Future[T], g: Future[T]): Future[T] = {
  val p = promise[T]
  f onSuccess {
    case x => p.trySuccess(x)
  }
  g onSuccess {
    case x => p.trySuccess(x)
  }
  p.future
}

这个函数returns future 先成功,如果其中任何一个失败,它永远不会完成。

是否可以修改为即使其他 future 失败,如果成功则返回第二个,如果两者都成功,则像现在的代码一样选择更快的那个。

您可以添加:

f onFailure {
  case e =>
    g onFailure {
      case _ =>
        p.failure(e)
    }
}

当两个 futures 都失败时,这将使承诺失败,但与 f 相同。如果需要,您可以对此进行详细说明以创建一个记录来自 fg 的 2 个异常的异常。

我建议你听从 Alvin Alexander 对 Scala 中的 futures 和 promises 的建议 here

我相信这是使用期货的最佳方式

package futures

import scala.concurrent.{Future => ConcurrentTask}           // rename
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import Utils.sleep

object FutureAsConcurrentTask extends App {

  // run some long-running task (task has type Future[Int] in this example)
  val task = ConcurrentTask {
    Cloud.executeLongRunningTask
  }

  // whenever the task completes, execute this code
  task.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => println(s"D'oh! The task failed: ${e.getMessage}")
  }

  // do your other work
  println("A ..."); sleep(100)
  println("B ..."); sleep(100)
  println("C ..."); sleep(100)
  println("D ..."); sleep(100)
  println("E ..."); sleep(100)
  println("F ..."); sleep(100)

}

这是一个基本模式,用于选择最快的未来或超时(如果它们都太慢):

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{ Success, Failure }
import akka.actor._
import akka.pattern.after

object GetFastestFutureOrTimeout extends App {
  val e = new TimeoutException("TimeoutException")
  val system = ActorSystem("GetFastestFutureOrTimeout")
  val f1 = Future { Thread.sleep(200); "this is f1" }
  val f2 = Future { Thread.sleep(100); "this is f2" }
  val timeoutFuture = after(500.milliseconds, using = system.scheduler) { Future.failed(e) }
  val f = Future.firstCompletedOf(f1 :: f2 :: timeoutFuture :: Nil)

  f onComplete {
    case Success(msg) => println(msg)
    case Failure(err) => println("An error occured: " + err.getMessage)
  }
}

这会打印 "this is f2"。如果将 timeoutFuture 的超时更改为 50,它将打印 "An error occured: TimeoutException".

在幕后,firstCompletedOf 使用 Promise 来 return 第一个已完成的 Future 的值,请参阅 https://github.com/scala/scala/blob/v2.11.6/src/library/scala/concurrent/Future.scala#L503

这是一个基本实现,用于获取最快的成功响应或失败(如果全部失败):

def getFirstSuccessfulResultOrFail[T](requests: List[Future[T]]): Future[T] = {
  val p              = Promise[T]()
  val countDownLatch = AtomicInt(0)

  requests.foreach { f =>
    f.onComplete {
      case Failure(e) => if (countDownLatch.addAndGet(1) == requests.size) p.tryFailure(e)
      case Success(s) => p.trySuccess(s)
    }
  }

  p.future
}