如何在 Scala 中实现 Future as Applicative?

How to implement Future as Applicative in Scala?

假设我需要运行 两个并发计算,等待它们,然后合并它们的结果。更具体地说,我需要同时 运行 f1: X1 => Y1f2: X2 => Y2 然后调用 f: (Y1, Y2) => Y 最终得到 Y.

的值

我可以创建未来的计算fut1: X1 => Future[Y1]fut2: X2 => Future[Y2],然后使用单子组合将它们组合起来得到fut: (X1, X2) => Future[Y]

问题在于单子组合意味着顺序等待。在我们的例子中,这意味着我们首先等待一个未来 然后 我们将等待另一个。例如。如果需要 2 秒。到第一个未来完成,只需 1 秒。到第二个失败的未来,我们浪费了 1 秒。

因此看起来我们需要一个 applicative 期货组合来等待 要么 都完成或至少一个期货失败。是否有意义 ?您将如何为期货实施 <*>

不需要顺序。未来的计算可能会在未来创建的那一刻开始。当然,如果 future 是由 flatMap 参数创建的(如果需要第一次计算的结果,则必须如此),那么它将是顺序的。但是在

这样的代码中
val f1 = Future {....}
val f2 = Future {....}
for (a1 <- f1; a2 <- f2) yield f(a1, a2)

你得到并发执行。

所以Monad隐含的Applicative实现是可以的。

您的 post 似乎包含两个或多或少独立的问题。 我将首先解决 运行 两个并发计算的具体实际问题。关于Applicative的问题在最后得到解答

假设你有两个异步函数:

val f1: X1 => Future[Y1]
val f2: X2 => Future[Y2]

还有两个值:

val x1: X1
val x2: X2  

现在您可以通过多种不同的方式开始计算。让我们来看看其中的一些。

for 之外开始计算(并行)

假设你这样做:

val y1: Future[Y1] = f1(x1)
val y2: Future[Y2] = f2(x2)

现在,计算 f1f2 已经 运行ning。收集结果的顺序无关紧要。你可以用 for-comprehension:

val y: Future[(Y1,Y2)] = for(res1 <- y1; res2 <- y2) yield (res1,res2)

for-comprehension中使用表达式y1y2不影响y1y2的计算顺序,它们仍在并行计算。

for 内开始计算(顺序)

如果我们简单地把y1y2的定义,直接代入到for推导中,我们还是会得到相同的结果,只是执行顺序不同会有所不同:

val y = for (res1 <- f1(x1); res2 <- f2(x2)) yield (res1, res2)

转换为

val y = f1(x1).flatMap{ res1 => f2(x2).map{ res2 => (res1, res2) } }

特别是,第二次计算在第一次计算结束后开始。这通常不是一个人想要的。

这里违反了一个基本的替换原则。如果没有副作用,可能可以将此版本转换为之前的版本,但在 Scala 中,必须明确地处理执行顺序。

压缩期货(平行)

期货尊重产品。有一种方法Future.zip,可以让你这样做:

val y = f1(x1) zip f2(x2)

这将运行两个计算并行,直到两个都完成,或者直到其中一个失败。

演示

这里有一个演示此行为的小脚本(受 muhuk 的 post 启发):

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import java.lang.Thread.sleep
import java.lang.System.{currentTimeMillis => millis}

var time: Long = 0

val x1 = 1
val x2 = 2

// this function just waits
val f1: Int => Future[Unit] = { 
  x => Future { sleep(x * 1000) }
}

// this function waits and then prints
// elapsed time
val f2: Int => Future[Unit] = {
  x => Future { 
    sleep(x * 1000)
    val elapsed = millis() - time
    printf("Time: %1.3f seconds\n", elapsed / 1000.0)
  }
}

/* Outside `for` */ {
  time = millis()
  val y1 = f1(x1)
  val y2 = f2(x2)
  val y = for(res1 <- y1; res2 <- y2) yield (res1,res2)
  Await.result(y, Duration.Inf)
}

/* Inside `for` */ {
  time = millis()
  val y = for(res1 <- f1(x1); res2 <- f2(x2)) yield (res1, res2)
  Await.result(y, Duration.Inf)
}

/* Zip */ {
  time = millis()
  val y = f1(x1) zip f2(x2)
  Await.result(y, Duration.Inf)
}

输出:

Time: 2.028 seconds
Time: 3.001 seconds
Time: 2.001 seconds

适用

使用 your other post 中的定义:

trait Applicative[F[_]] {
  def apply[A, B](f: F[A => B]): F[A] => F[B]
}

可以这样做:

object FutureApplicative extends Applicative[Future] {
  def apply[A, B](ff: Future[A => B]): Future[A] => Future[B] = {
    fa => for ((f,a) <- ff zip fa) yield f(a)
  }
}

但是,我不确定这与您的具体问题有什么关系,或者与可理解和可读的代码有什么关系。一个 Future 已经 一个 monad(这比 Applicative 强),甚至还有它的内置语法,所以我没有看到任何在此处添加一些 Applicative 的好处。

The problem is that monadic composition implies sequential wait. In our case it implies that we wait for one future first and then we will wait for another.

不幸的是,这是真的。

import java.util.Date
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Test extends App {
        def timestamp(label: String): Unit = Console.println(label + ": " + new Date().getTime.toString)

        timestamp("Start")
        for {
                step1 <- Future {
                        Thread.sleep(2000)
                        timestamp("step1")
                }
                step2 <- Future {
                        Thread.sleep(1000)
                        timestamp("step2")
                }
        } yield { timestamp("Done") }

        Thread.sleep(4000)
}

运行宁此代码输出:

Start: 1430473518753
step1: 1430473520778
step2: 1430473521780
Done: 1430473521781

Thus it looks like we need an applicative composition of the futures to wait till either both complete or at least one future fails.

我不确定应用组合与并发策略有什么关系。使用 for comprehensions,如果所有 futures 都完成,你会得到一个结果,如果其中任何一个失败,你会得到一个失败。所以它在语义上是相同的。

为什么他们运行按顺序

我认为期货顺序 运行 的原因是因为 step1step2 内可用(以及在其余计算中)。本质上我们可以将 for 块转换为:

def step1() = Future {
    Thread.sleep(2000)
    timestamp("step1")
}
def step2() = Future {
    Thread.sleep(1000)
    timestamp("step2")
}
def finalStep() = timestamp("Done")
step1().flatMap(step1 => step2()).map(finalStep())

因此,前面计算的结果可用于其余步骤。它在这方面不同于 <?> & <*>

如何运行并行期货

@andrey-tyukin 的代码 运行s 并行期货:

import java.util.Date
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Test extends App {
        def timestamp(label: String): Unit = Console.println(label + ": " + new Date().getTime.toString)

        timestamp("Start")
        (Future {
                Thread.sleep(2000)
                timestamp("step1")
        } zip Future {
                Thread.sleep(1000)
                timestamp("step2")
        }).map(_ => timestamp("Done"))
        Thread.sleep(4000)
}

输出:

Start: 1430474667418
step2: 1430474668444
step1: 1430474669444
Done: 1430474669446

None 其他答案中的方法在快速失败的未来加上长时间后成功的未来的情况下做正确的事情。

但是这样的方法可以手动实现:

def smartSequence[A](futures: Seq[Future[A]]): Future[Seq[A]] = {
  val counter = new AtomicInteger(futures.size)
  val result = Promise[Seq[A]]()

  def attemptComplete(t: Try[A]): Unit = {
    val remaining = counter.decrementAndGet
    t match {
      // If one future fails, fail the result immediately
      case Failure(cause) => result tryFailure cause
      // If all futures have succeeded, complete successful result
      case Success(_) if remaining == 0 => 
        result tryCompleteWith Future.sequence(futures)
      case _ =>
    }
  }

  futures.foreach(_ onComplete attemptComplete)
  result.future
}

ScalaZ 在内部做类似的事情,所以 f1 |@| f2List(f1, f2).sequence 在任何期货失败后都会立即失败。

下面是对这些方法失败时间的快速测试:

import java.util.Date
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz._, Scalaz._

object ReflectionTest extends App {
  def f1: Future[Unit] = Future {
    Thread.sleep(2000)
  }

  def f2: Future[Unit] = Future {
    Thread.sleep(1000)
    throw new RuntimeException("Failure")
  }

  def test(name: String)(
    f: (Future[Unit], Future[Unit]) => Future[Unit]
  ): Unit = {
    val start = new Date().getTime
    f(f1, f2).andThen {
      case _ => 
        println(s"Test $name completed in ${new Date().getTime - start}")
    }
    Thread.sleep(2200)
  }

  test("monadic") { (f1, f2) => for (v1 <- f1; v2 <- f2) yield () }

  test("zip") { (f1, f2) => (f1 zip f2).map(_ => ()) }

  test("Future.sequence") { 
    (f1, f2) => Future.sequence(Seq(f1, f2)).map(_ => ()) 
  }

  test("smartSequence") { (f1, f2) => smartSequence(Seq(f1, f2)).map(_ => ())}

  test("scalaz |@|") { (f1, f2) => (f1 |@| f2) { case _ => ()}}

  test("scalaz sequence") { (f1, f2) => List(f1, f2).sequence.map(_ => ())}

  Thread.sleep(30000)
}

我机器上的结果是:

Test monadic completed in 2281
Test zip completed in 2008
Test Future.sequence completed in 2007
Test smartSequence completed in 1005
Test scalaz |@| completed in 1003
Test scalaz sequence completed in 1005