如何确保给定的未来在测试中首先完成?

How to make sure a given future completes first in tests?

我正在为函数编写测试 bar:

def bar(fut1: Future[Int], 
        fut2: Future[Int], 
        fut3: Future[Int]): Future[Result] = ???

bar returns Result 像这样:

 case class Result(
    x: Int,          // fut1 value
    oy: Option[Int], // if fut2 is complete then Some of fut2 value else None 
    oz: Option[Int]  // if fut3 is complete then Some of fut3 value else None 
 ) 

我想为所有测试用例编写测试:

所以我正在为这些测试编写 fake 函数 foo1foo2foo3 的实现。

def foo1(x: Int): Future[Int] = ??? 
def foo2(x: Int): Future[Int] = ??? 
def foo3(x: Int): Future[Int] = ??? 

测试 #1 调用所有这些函数,检查 fut1 是否先完成,然后调用 bar

val fut1 = foo1(0)
val fut2 = foo2(0)
val fut3 = foo3(0)

// make sure `fut1` completes first

测试 #2 调用所有这些函数,确保 fut2 首先完成,然后调用 bar.
测试 #3 调用所有这些函数,确保 fut3 首先完成,然后调用 bar.

我的问题是如何实现函数 foo1foo2foo3 以及测试。

您可以尝试通过 map 将完整性时间戳附加到每个未来,例如:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps

def foo1(x: Int): Future[Int] = Future {Thread.sleep(200); 1} 
def foo2(x: Int): Future[Int] = Future {Thread.sleep(500); 2} 
def foo3(x: Int): Future[Int] = Future {Thread.sleep(500); 3}

def completeTs[T](future: Future[T]): Future[(T, Long)] = future.map(v => v -> System.currentTimeMillis())

val resutls = Await.result(Future.sequence(
  List(completeTs(foo1(1)), completeTs(foo2(1)), completeTs(foo3(1)))
), 2 seconds)

val firstCompleteTs = resutls.map(_._2).min
val firstCompleteIndex = resutls.indexWhere(_._2 == firstCompleteTs)
assert(firstCompleteIndex == 0)

斯凯蒂:https://scastie.scala-lang.org/L9g78DSNQIm2K1jGlQzXBg

您可以重新利用 firstCompletedOf 来验证期货列表中给定指数的未来是否是第一个完成的指数:

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try

def isFirstCompleted[T](idx: Int)(futures: List[Future[T]])(
    implicit ec: ExecutionContext): Future[Boolean] = {
  val promise = Promise[(T, Int)]()
  val pRef = new AtomicReference[Promise[(T, Int)]](promise)
  futures.zipWithIndex foreach { case (f, i) => f onComplete { case tt: Try[T] =>
      val p = pRef.getAndSet(null)
      if (p != null) p tryComplete tt.map((_, i))
    }
  }
  promise.future.map{ case (t, i) => i == idx }
}

测试运行:

import scala.concurrent.ExecutionContext.Implicits.global

val futures = List(
  Future{Thread.sleep(100); 1},
  Future{Thread.sleep(150); throw new Exception("oops!")},
  Future{Thread.sleep(50); 3}
)

isFirstCompleted(0)(futures)  // Future(Success(false))
isFirstCompleted(2)(futures)  // Future(Success(true))

为了编写测试用例,考虑使用ScalaTest AsyncFlatSpec

不清楚您要测试的到底是什么。 如果您只是使用已经完成的期货,您将得到您描述的行为:

def f1 = Future.successful(1)
def f2 = Future.successful(2)
def f3 = Future.successful(3)

eventually {
   Future.firstCompletedOf(Seq(f1, f2, f3)).value shouldBe Some(1)
}

(请注意,您 不能 像您在问题中所做的那样直接与 fut1 进行比较,这将永远是错误的,因为 .firstCompletedOf returns一个新的未来)。

你也可以只完成一个 future,其他的不要管:

    val promise = Promise[Int].future
    def f1 = promise.future // or just Future.successful(1) ... or Future(1)
    def f2 = Future.never
    def f3 = Future.never
  
    result =  Future.firstCompletedOf(Seq(f1, f2, f3))
    promise.complete(Success(1)) 
    eventually {
       result.value shouldBe 1
    }

等等......可以让其他期货也得到他们自己的承诺的支持,例如,如果你希望它们最终全部完成(不确定它会给你带来什么,但话又说回来,我不确定您在这里开始测试的内容)。

另一种可能是让他们互相依赖:

    val promise = Promise[Int]
    def f1 = promise.future
    def f2 = promise.future.map(_ + 1)
    def f3 = promise.future.map(_ + 2)
    
    ... 
    promise.complete(Success(1))