为什么我在多线程 Scala 中出现竞争条件?
Why am I getting a race condition in multi-threading scala?
我正在尝试对数组进行 p 范数计算。
为了实现这一点,我尝试了以下方法,我知道我可以用不同的方式解决这个问题,但我有兴趣了解竞争条件发生的位置,
val toSum = Array(0,1,2,3,4,5,6)
// Calculate the sum over a segment of an array
def sumSegment(a: Array[Int], p:Double, s: Int, t: Int): Int = {
val res = {for (i <- s until t) yield scala.math.pow(a(i), p)}.reduceLeft(_ + _)
res.toInt
}
// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
var acc = 0L
// The worker who should calculate the sum over a slice of an array
class sumSegmenter(s: Int, t: Int) extends Thread {
override def run() {
// Calculate the sum over the slice
val subsum = sumSegment(a, p, s, t)
// Add the sum of the slice to the accumulator in a synchronized fashion
val x = new AnyRef{}
x.synchronized {
acc = acc + subsum
}
}
}
val split = a.size / 2
val seg_one = new sumSegmenter(0, split)
val seg_two = new sumSegmenter(split, a.size)
seg_one.start
seg_two.start
seg_one.join
seg_two.join
scala.math.pow(acc, 1.0 / p)
}
println(parallelpNorm(toSum, 2))
预期输出是 9.5393920142,但有些运行结果是 9.273618495495704 甚至 2.23606797749979。
任何可能发生竞争条件的建议?
将 val x = new AnyRef{}
移到 sumSegmenter
之外(即移入 parallelpNorm
)——问题是 each 线程正在使用自己的线程互斥量而不是共享一个。
这个问题已经在之前的答案中解释过了,但是避免这种竞争条件并提高性能的更好方法是使用 AtomicInteger
// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
val acc = new AtomicInteger(0)
// The worker who should calculate the sum over a slice of an array
class sumSegmenter(s: Int, t: Int) extends Thread {
override def run() {
// Calculate the sum over the slice
val subsum = sumSegment(a, p, s, t)
// Add the sum of the slice to the accumulator in a synchronized fashion
acc.getAndAdd(subsum)
}
}
val split = a.length / 2
val seg_one = new sumSegmenter(0, split)
val seg_two = new sumSegmenter(split, a.length)
seg_one.start()
seg_two.start()
seg_one.join()
seg_two.join()
scala.math.pow(acc.get, 1.0 / p)
}
现代处理器可以在不阻塞的情况下执行原子操作,这比显式同步要快得多。在我的测试中,它的运行速度是原始代码的两倍(正确放置 x
)
我正在尝试对数组进行 p 范数计算。
为了实现这一点,我尝试了以下方法,我知道我可以用不同的方式解决这个问题,但我有兴趣了解竞争条件发生的位置,
val toSum = Array(0,1,2,3,4,5,6)
// Calculate the sum over a segment of an array
def sumSegment(a: Array[Int], p:Double, s: Int, t: Int): Int = {
val res = {for (i <- s until t) yield scala.math.pow(a(i), p)}.reduceLeft(_ + _)
res.toInt
}
// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
var acc = 0L
// The worker who should calculate the sum over a slice of an array
class sumSegmenter(s: Int, t: Int) extends Thread {
override def run() {
// Calculate the sum over the slice
val subsum = sumSegment(a, p, s, t)
// Add the sum of the slice to the accumulator in a synchronized fashion
val x = new AnyRef{}
x.synchronized {
acc = acc + subsum
}
}
}
val split = a.size / 2
val seg_one = new sumSegmenter(0, split)
val seg_two = new sumSegmenter(split, a.size)
seg_one.start
seg_two.start
seg_one.join
seg_two.join
scala.math.pow(acc, 1.0 / p)
}
println(parallelpNorm(toSum, 2))
预期输出是 9.5393920142,但有些运行结果是 9.273618495495704 甚至 2.23606797749979。
任何可能发生竞争条件的建议?
将 val x = new AnyRef{}
移到 sumSegmenter
之外(即移入 parallelpNorm
)——问题是 each 线程正在使用自己的线程互斥量而不是共享一个。
这个问题已经在之前的答案中解释过了,但是避免这种竞争条件并提高性能的更好方法是使用 AtomicInteger
// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
val acc = new AtomicInteger(0)
// The worker who should calculate the sum over a slice of an array
class sumSegmenter(s: Int, t: Int) extends Thread {
override def run() {
// Calculate the sum over the slice
val subsum = sumSegment(a, p, s, t)
// Add the sum of the slice to the accumulator in a synchronized fashion
acc.getAndAdd(subsum)
}
}
val split = a.length / 2
val seg_one = new sumSegmenter(0, split)
val seg_two = new sumSegmenter(split, a.length)
seg_one.start()
seg_two.start()
seg_one.join()
seg_two.join()
scala.math.pow(acc.get, 1.0 / p)
}
现代处理器可以在不阻塞的情况下执行原子操作,这比显式同步要快得多。在我的测试中,它的运行速度是原始代码的两倍(正确放置 x
)