在多线程环境中隐式 class 持有可变变量

Implicit class holding mutable variable in multithreaded environment

我需要实现一个 parallel 方法,它需要两个计算块,ab,并在一个新线程中启动它们中的每一个。该方法必须 return 一个包含两个计算结果值的元组。它应该具有以下签名:

def parallel[A, B](a: => A, b: => B): (A, B)

我设法通过直接使用类似 Java 的方法解决了这个练习。然后我决定用隐式 class 来弥补一个解决方案。在这里:

object ParallelApp extends App {

  implicit class ParallelOps[A](a: => A) {
    var result: A = _

    def spawn(): Unit = {

      val thread = new Thread {
        override def run(): Unit = {
          result = a
        }
      }
      thread.start()
      thread.join()
    }
  }

  def parallel[A, B](a: => A, b: => B): (A, B) = {
    a.spawn()
    b.spawn()
    (a.result, b.result)

  }

  println(parallel(1 + 2, "a" + "b"))

}

由于未知原因,我收到输出 (null,null)。能指出问题出在哪里吗?

编辑:修复了 Andrey Tyukin 指出的错误

解决问题的一种方法是使用 Scala Futures

Documentation. TutorialUseful Klang Blog.

您通常需要这些库的某种组合:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.duration._

一个异步示例:

def parallelAsync[A,B](a: => A, b: => B): Future[(A,B)] = {
  // as per Andrey Tyukin's comments, this line runs
  // the two futures sequentially and we do not get
  // any benefit from it.  I will leave this line here
  // so others will not fall in my trap
  //for {i <- Future(a); j <- Future(b) } yield (i,j)
  Future(a) zip Future(b)
}

parallelAsync(1 + 2, "a" + "b").onComplete {
  case Success(x) => println(x)
  case Failure(e) => e.printStackTrace()
}

如果你必须阻塞直到两个都完成,你可以使用这个:

def parallelSync[A,B](a: => A, b: => B): (A,B) = {
  // see comment above
  //val f = for { i <- Future(a); j <- Future(b) } yield (i,j)
  val tuple = Future(a) zip Future(b)
  Await.result(tuple, 5 second)
}

println(parallelSync(3 + 4, "c" + "d"))

当运行这些小例子的时候,不要忘记在最后睡一会儿,这样程序就不会在结果回来之前结束

Thread.sleep(3000)

剧透警告:并不复杂。这很有趣,就像一个魔术(如果您考虑阅读有关 Java 内存模型 "funny" 的文档)。如果你还没有弄明白,我强烈建议你试着弄清楚,否则就不好笑了。有人应该从中解出 "division-by-zero proves 2 = 4"-谜语。


考虑以下较短的示例:

implicit class Foo[A](a: A) {
  var result: String = "not initialized"
  def computeResult(): Unit = result = "Yay, result!"
}

val a = "a string"
a.computeResult()

println(a.result)

当运行时,打印

not initialized

尽管我们调用了 computeResult() 并将 result 设置为 "Yay, result!"。问题在于 a.computeResult()a.result 这两个调用属于 Foo 的两个完全独立的实例。隐式转换执行了两次,第二个隐式创建的对象对第一个隐式创建的对象的变化一无所知。它与线程或 JMM 完全无关。

顺便说一句:您的代码不是并行的。在调用 start 之后立即调用 join 不会给您带来任何好处,您的主线程只会闲置并等待另一个线程完成。在任何时候都不会有两个线程同时执行任何有用的工作。