运行 两个 scala 函数并行,5 分钟后返回最新值

Running two scala functions in parallel, returning the latest value after 5 minutes

我有两个 Scala 函数 运行。每一个都像下面这样,它们开始提高一个变量的值,我想在 5 分钟(或其他时间)后同时 运行 它们。我想终止这两个函数并获取它们到那个时候的最新值。

def func1(n: Int): Double = {
   var a = 0.0D
   while (not terminated) {
       /// improve value of 'a' with algorithm 1 
   }
}

def func2(n: Int): Double = {
   var a = 0.0D
   while (not terminated) {
       /// improve value of 'a' with algorithm 2 
   }
}

我想知道我应该如何构建我的代码来做到这一点,这里的最佳实践是什么?我正在考虑 运行 在两个不同的线程中使用超时和 return 它们在超时时的最新值。但似乎还有其他方法可以做到这一点。我是 Scala 的新手,所以任何见解都会非常有帮助。

不难。这是一种方法:

  @volatile var terminated = false

  def func1(n: Int): Double = {
    var a = 0.0D
    while (!terminated) {
      a = 0.0001 + a * 0.99999; //some useless formula1
    }
    a
  }

  def func2(n: Int): Double = {
    var a = 0.0D
    while (!terminated) {
      a += 0.0001  //much simpler formula2, just for testing
    }
    a
  }


  def main(args: Array[String]): Unit = {

    val f1 = Future { func1(1) } //work starts here

    val f2 = Future { func2(2) } //and here

    //aggregate results into one common future
    val aggregatedFuture = for{
      f1Result <- f1
      f2Result <- f2
    } yield (f1Result, f2Result)

    Thread.sleep(500) //wait here for some calculations in ms
    terminated = true //this is where we actually command to stop

    //since looping to while() takes time, we need to wait for results
    val res = Await.result(aggregatedFuture, 50.millis)
    //just a printout
    println("results:" + res)

 }

但是,当然,您可能希望查看 while 循环并创建更易于管理和链接的计算。

输出:results:(9.999999999933387,31206.34691883926)

我不是 100% 确定这是否是您想要做的事情,但这是一种方法(不是 5 分钟,但您可以更改它):

object s
{
    def main(args: Array[String]): Unit = println(run())

    def run(): (Int, Int) =
    {
        val (s, numNanoSec, seedVal) = (System.nanoTime, 500000000L, 0)
        Seq(f1 _, f2 _).par.map(f => 
        {
            var (i, id) = f(seedVal)
            while (System.nanoTime - s < numNanoSec)
            {
                i = f(i)._1
            }
            (i, id)
        }).seq.maxBy(_._1)
    }
    def f1(a: Int): (Int, Int) = (a + 1, 1)
    def f2(a: Int): (Int, Int) = (a + 2, 2)
}

输出:

me@ideapad:~/junk> scala s.scala 
(34722678,2)
me@ideapad:~/junk> scala s.scala 
(30065688,2)
me@ideapad:~/junk> scala s.scala 
(34650716,2)

当然,这一切都假设您至少有两个线程可用于分配任务。

您可以使用 FutureAwait 结果来做到这一点:

  def fun2(): Double = {
    var a = 0.0f
    val f = Future {
      // improve a with algorithm 2 
      a
    }
    try {
      Await.result(f, 5 minutes)
    } catch {
      case e: TimeoutException => a
    }
  }

使用Await.result等待algorithm超时,当我们遇到这个超时,我们return直接a