为什么我在多线程 Scala 中出现竞争条件?

Why am I getting a race condition in multi-threading scala?

我正在尝试对数组进行 p 范数计算。

为了实现这一点,我尝试了以下方法,我知道我可以用不同的方式解决这个问题,但我有兴趣了解竞争条件发生的位置,

val toSum = Array(0,1,2,3,4,5,6)

// Calculate the sum over a segment of an array
def sumSegment(a: Array[Int], p:Double, s: Int, t: Int): Int = {
  val res = {for (i <- s until t) yield scala.math.pow(a(i), p)}.reduceLeft(_ + _)
  res.toInt
}

// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
  var acc = 0L

  // The worker who should calculate the sum over a slice of an array
  class sumSegmenter(s: Int, t: Int) extends Thread {
    override def run() {
      // Calculate the sum over the slice
      val subsum = sumSegment(a, p, s, t)
      // Add the sum of the slice to the accumulator in a synchronized fashion
      val x = new AnyRef{}
      x.synchronized {
        acc = acc + subsum
      }
    }
  }

  val split = a.size  / 2
  val seg_one = new sumSegmenter(0, split)
  val seg_two = new sumSegmenter(split, a.size)
  seg_one.start
  seg_two.start
  seg_one.join
  seg_two.join
  scala.math.pow(acc, 1.0 / p)
}
println(parallelpNorm(toSum, 2))

预期输出是 9.5393920142,但有些运行结果是 9.273618495495704 甚至 2.23606797749979。

任何可能发生竞争条件的建议?

val x = new AnyRef{} 移到 sumSegmenter 之外(即移入 parallelpNorm)——问题是 each 线程正在使用自己的线程互斥量而不是共享一个。

这个问题已经在之前的答案中解释过了,但是避免这种竞争条件并提高性能的更好方法是使用 AtomicInteger

// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
  val acc = new AtomicInteger(0)

  // The worker who should calculate the sum over a slice of an array
  class sumSegmenter(s: Int, t: Int) extends Thread {
    override def run() {
      // Calculate the sum over the slice
      val subsum = sumSegment(a, p, s, t)
      // Add the sum of the slice to the accumulator in a synchronized fashion
      acc.getAndAdd(subsum)
    }
  }

  val split = a.length / 2
  val seg_one = new sumSegmenter(0, split)
  val seg_two = new sumSegmenter(split, a.length)
  seg_one.start()
  seg_two.start()
  seg_one.join()
  seg_two.join()
  scala.math.pow(acc.get, 1.0 / p)
}

现代处理器可以在不阻塞的情况下执行原子操作,这比显式同步要快得多。在我的测试中,它的运行速度是原始代码的两倍(正确放置 x