REPL 中 Scala 中 java.util.concurrent._ 的死锁

Deadlocks with java.util.concurrent._ in Scala in REPL

我在研究 Paul Chiusano 和 Runar Bjanarson 的书 "Functional Programming in Scala"(第 7 章 - 纯函数式并行)时遇到了以下情况。

    package fpinscala.parallelism

    import java.util.concurrent._
    import language.implicitConversions


    object Par {
      type Par[A] = ExecutorService => Future[A]

      def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

      def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // `unit` is represented as a function that returns a `UnitFuture`, which is a simple implementation of `Future` that just wraps a constant value. It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled. Its `get` method simply returns the value that we gave it.

      private case class UnitFuture[A](get: A) extends Future[A] {
        def isDone = true
        def get(timeout: Long, units: TimeUnit) = get
        def isCancelled = false
        def cancel(evenIfRunning: Boolean): Boolean = false
      }

      def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = // `map2` doesn't evaluate the call to `f` in a separate logical thread, in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
        (es: ExecutorService) => {
          val af = a(es)
          val bf = b(es)
          UnitFuture(f(af.get, bf.get)) // This implementation of `map2` does _not_ respect timeouts. It simply passes the `ExecutorService` on to both `Par` values, waits for the results of the Futures `af` and `bf`, applies `f` to them, and wraps them in a `UnitFuture`. In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`, then subtracts that time from the available time allocated for evaluating `bf`.
        }

      def fork[A](a: => Par[A]): Par[A] = // This is the simplest and most natural implementation of `fork`, but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete. Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`, this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice. This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
        es => es.submit(new Callable[A] {
          def call = a(es).get
        })

      def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

 def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
    p(e).get == p2(e).get

}

您可以在 Github here. See here 上找到 java.util.concurrent 文档的原始代码。

我很关心 fork 的实施。特别是,据称 fork 当 ThreadPool 太小时会导致死锁。

我考虑下面的例子:

val a = Par.lazyUnit(42 + 1)
val es: ExecutorService = Executors.newFixedThreadPool(2)
println(Par.fork(a)(es).get)  

我不希望这个例子以死锁结束,因为有两个线程。然而,当我在 Scala REPL 中 运行 它时,它会在我的计算机上运行。为什么会这样?

初始化ExecutorService时的输出是 es: java.util.concurrent.ExecutorService =

java.util.concurrent.ThreadPoolE
xecutor@73a86d72[Running, pool size = 0, active threads = 0, queued tasks =
 0, completed tasks = 0]

这里pool size = 0正确吗?也就是说,这是不理解的问题java.util.concurrent._还是不理解Scala部分的问题?

好的,经过长时间的调查,我相信我有答案了。完整的故事很长,但我会尽量通过简化和避免许多细节来缩短它。

注意:可能 Scala 可以编译到各种不同的目标平台,但这个特定问题发生在 Java/JVM 作为目标,所以这就是这个答案的内容.

您看到的死锁与线程池的大小无关。实际上挂起的是外部 fork 调用。它与 REPL 实现细节和多线程的结合有关,但需要学习一些知识才能理解它是如何发生的:

  • Scala REPL 的工作原理
  • Scala 如何将 objects 编译为 Java/JVM
  • Scala 如何在 Java/JVM
  • 上模拟别名参数
  • 如何 Java/JVM 运行 是 classes
  • 的静态初始值设定项

一个较短的(er)版本(另请参阅末尾的摘要)是这段代码挂在 REPL 下,因为当它被 REPL 执行时,它在逻辑上是类似于下面的代码:

object DeadLock {

  import scala.concurrent._
  import scala.concurrent.duration.Duration
  import scala.concurrent.ExecutionContext.Implicits.global

  val foo: Int = Await.result(Future(calc()), Duration.Inf)

  def printFoo(): Unit = {
    println(s"Foo = $foo")
  }

  private def calc(): Int = {
    println("Before calc")
    42
  }
}


def test(): Unit = {
  println("Before printFoo")
  DeadLock.printFoo()
  println("After printFoo")
} 

或在 Java 世界中非常相似:

class Deadlock {
    static CompletableFuture<Integer> cf;
    static int foo;

    public static void printFoo() {
        System.out.println("Print foo " + foo);
    }

    static {
        cf = new CompletableFuture<Integer>();
        new Thread(new Runnable() {
            @Override
            public void run() {
                calcF();
            }
        }).start();
        try {
            foo = cf.get();
            System.out.println("Future result = " + cf.get());
        } catch (InterruptedException e) {
            e.printStackTrace();f
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }


    private static void calcF() {
        cf.complete(42);
    }
}

public static void main(String[] args) {
    System.out.println("Before foo");
    Deadlock.printFoo();
    System.out.println("After foo");
}

如果您清楚为什么这段代码会死锁,那么您已经了解了大部分内容,并且可能可以自己推断出其余部分。您可能只浏览最后的 摘要 部分。

Java 静态初始化程序如何死锁?

让我们从这个故事的结尾开始:为什么 Java 代码挂起?这是因为 Java/JVM 对静态初始化程序的两个保证(有关更多详细信息,请参阅 JLS 的 12.4.2. Detailed Initialization Procedure 部分):

  • 静态初始化程序将 运行 在 "external" 使用 class

  • 之前
  • 静态初始化器将 运行 恰好一次,它是通过全局锁定完成的

用于静态初始化程序的锁是隐式的,由 JVM 管理,但它确实存在。这意味着代码在逻辑上类似于这样的东西:

class Deadlock {

    static boolean staticInitFinished = false;
    // unique value for each thread!
    static ThreadLocal<Boolean> currentThreadRunsStaticInit = ThreadLocal.withInitial(() -> Boolean.FALSE);


    static CompletableFuture<Integer> cf;
    static int foo;

    static void enforceStaticInit() {
        synchronized (Deadlock.class) {
            // is init finished?
            if (staticInitFinished)
                return;
            // are we the thread already running the init?
            if(currentThreadRunsStaticInit.get())
                return;
            currentThreadRunsStaticInit.set(true);

            cf = new CompletableFuture<Integer>();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    calcF();
                }
            }).start();
            try {
                foo = cf.get();
                System.out.println("Future result = " + cf.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            currentThreadRunsStaticInit.set(false);
            staticInitFinished = true;
        }
    }

    private static void calcF() {
        enforceStaticInit();
        cf.complete(42);
    }

    public static void printFoo() {
        enforceStaticInit();
        System.out.println("Print foo " + foo);
    }
}

现在很清楚为什么这段代码会死锁:我们的静态初始化程序启动一个新线程并阻塞等待它的结果。但是那个新线程试图访问相同的 class (calcF 方法)并且作为另一个线程它必须等待已经 运行ning 静态初始化器完成。请注意,如果 calcF 方法在另一个 class 中,则一切正常。

Scala REPL 的工作原理

现在让我们回到故事的开头,了解 Scala REPL 的工作原理。这个答案是对真实交易的极大简化,但它抓住了这种情况的重要细节。对于 REPL 实现者来说幸运的是,Scala 编译器是用 Scala 编写的。这意味着 REPL 不必以某种方式解释代码,它可以 运行 通过标准编译器,然后 运行 通过 Java 反射 API 编译代码。这仍然需要对代码进行一些修饰才能使编译器满意并取回结果。

当您输入类似

的内容时,将其简化一点(或好吧,很多)
val a = Par.lazyUnit(42 + 1)

在 REPL 中,代码被分析并转换成这样的东西:

package line3

object read {
    val a = Par.lazyUnit(42 + 1)
    val res3 = a
}

object eval {
    def print() = {
        println("a: Par.Par[Int] = " + read.res3)
    }
}

然后通过反射调用line3.eval.print()

类似的故事发生在:

val es: ExecutorService = Executors.newFixedThreadPool(2)

最后当你这样做时

Par.fork(a)(es).get

事情变得更有趣了,因为您依赖于使用 imports:

巧妙实现的前几行
package line5

object read {
    import line2.read.Par
    import line3.read.a
    import line4.read.es

    val res5 = Par.fork(a)(es).get
}

object eval {
    def print() = {
        println("res5: Int = " + read.res5)
    }
}

这里重要的事情是你写进REPL的所有东西都被包装成一个全新的object然后编译和 运行 作为通常的代码。

Scala 如何在 Java/JVM

上模拟按名称的参数

fork方法的定义使用了by-name parameter:

def fork[A](a: => Par[A]): Par[A] =

这里用来懒惰地计算a,这对fork的整个逻辑至关重要。 Java/JVM 没有对惰性求值的标准支持,但它可以被模拟,这就是 Scala 编译器所做的。在内部,签名更改为使用 Function0:

def fork[A](aWrapper: () => Par[A]): Par[A] = 

并且对 a 的每次访问都被对 aWrapper.apply() 的调用所取代。神奇的另一部分发生在带有别名参数的方法的调用方:那里的参数也应该被包装到 Function0 中,这样代码就变成了

object read {
    import line2.read.Par
    import line3.read.a
    import line4.read.es

    val res5 = Par.fork(() => a)(es).get
}

但实际上有点不同。天真地,仅仅为了这个小函数就需要另一个 class ,这对于这样一个简单的逻辑来说感觉很浪费。在 Scala 2.12 的实践中,使用了 Java 8 LambdaMetafactory 的魔法,所以代码真的变成了

object read {
    import line2.read.Par
    import line3.read.a
    import line4.read.es

    def aWrapper():Int = a

    val res5 = Par.fork(aWrapper _)(es).get
}

其中 aWrapper _ 表示将方法转换为 Funciton0,这是通过 LambdaMetafactory 完成的。正如您可能从有关 Java 静态初始化程序死锁的章节中所怀疑的那样,def aWrapper 的引入是一个 关键差异 .您已经可以看到此代码与挂起的答案中的第一个 Scala 片段非常相似。

Scala 如何在 Java/JVM

上编译 object

最后一块拼图是 Scala object 是如何在 Java/JVM 中编译的。好吧,它实际上被编译为类似于 "static class" 的东西,但由于您可以使用 object 作为对象参数,因此它必须更复杂一些。实际上,所有的初始化逻辑都被移到了 object class 的构造函数中,并且有一个简单的静态初始化程序调用它。所以我们在 Java 中的最后一个 read 对象(忽略 imports)看起来像这样:

class read$ {
    static read$ MODULE$

    static {
        new read$()
    }

    private Par[Int] res5;

    private read$() {
        MODULE$ = this;
        res5 = Par.fork(read$::aWrapper)(es).get
    }

    private static int aWrapper(){
        return line3.read$.MODULE$.a;
    }
}

这里再次read$::aWrapper表示使用LambdaMetafactoryaWrapper方法构建Function0。换句话说,Scala object 初始化被翻译成 运行 作为 [ 的一部分的代码=139=]Java 静态初始化器.

总结

总结事情是如何搞砸的:

  • REPL 将你的代码变成每行的新 object 并编译它

  • object初始化逻辑被翻译成Java静态初始化逻辑

  • 在简单情况下,调用带有别名参数的方法被转换为包装 "return the value" 逻辑的方法,并且该方法被添加到相同的 classobject

  • Par.fork 作为 object 初始化的一部分执行(即 Java 静态初始化程序的一部分)尝试评估 by-name 参数(即在不同的线程上调用相同 class 上的方法)并阻塞等待该线程的结果

  • Java 静态初始化程序在逻辑上是在全局锁下执行的,因此它会阻止调用该方法的不同线程。但它本身被阻止等待该方法调用完成。