Scala:使用 ListBuffer 附加的并行执行不会产生预期的结果

Scala: Parallel execution with ListBuffer appends doesn't produce expected outcome

我知道我在 mutable.ListBuffer 上做错了,但我不知道如何解决它(以及对问题的正确解释)。

我简化了下面的代码以重现该行为。

我基本上是在尝试 运行 并行运行,以便在处理第一个列表时将元素添加到列表中。我最终得到 "losing" 个元素。

import java.util.Properties

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}


import scala.concurrent.{ExecutionContext}
import ExecutionContext.Implicits.global


object MyTestObject {

  var listBufferOfInts = new ListBuffer[Int]() // files that are processed

  def runFunction(): Int = {
    listBufferOfInts = new ListBuffer[Int]()
    val inputListOfInts = 1 to 1000
    val fut = Future.traverse(inputListOfInts) { i =>
      Future {
        appendElem(i)
      }
    }
    Await.ready(fut, Duration.Inf)
    listBufferOfInts.length
  }

  def appendElem(elem: Int): Unit = {
    listBufferOfInts ++= List(elem)
  }
}

MyTestObject.runFunction()
MyTestObject.runFunction()
MyTestObject.runFunction()

哪个returns:

res0: Int = 937
res1: Int = 992
res2: Int = 997

显然我希望 1000 一直返回。如何修复我的代码以保留 "architecture" 但使我的 ListBuffer "synchronized" ?

改变

listBufferOfInts ++= List(elem)

synchronized {
    listBufferOfInts ++= List(elem)
}

让它发挥作用。可能会成为性能问题?我仍然对解释感兴趣,也许还有更好的做事方式!

我不确定上面的内容是否正确显示了您正在尝试做的事情。也许问题是您实际上共享了一个在 runFunction 中重新初始化的 var ListBuffer。

当我取出这个时,我正确地收集了我期望的所有事件:

import java.util.Properties

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }

import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global

object BrokenTestObject extends App {

  var listBufferOfInts = ( new ListBuffer[Int]() )

  def runFunction(): Int = {
    val inputListOfInts = 1 to 1000
    val fut = Future.traverse(inputListOfInts) { i =>
      Future {
        appendElem(i)
      }
    }
    Await.ready(fut, Duration.Inf)
    listBufferOfInts.length
  }

  def appendElem(elem: Int): Unit = {
    listBufferOfInts.append( elem )
  }

  BrokenTestObject.runFunction()
  BrokenTestObject.runFunction()
  BrokenTestObject.runFunction()

  println(s"collected ${listBufferOfInts.length} elements")
}

如果你真的有同步问题,你可以使用类似下面的方法:

import java.util.Properties

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }

import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global

class WrappedListBuffer(val lb: ListBuffer[Int]) {
  def append(i: Int) {
    this.synchronized {
      lb.append(i)
    }
  }
}

object MyTestObject extends App {

  var listBufferOfInts = new WrappedListBuffer( new ListBuffer[Int]() )

  def runFunction(): Int = {
    val inputListOfInts = 1 to 1000
    val fut = Future.traverse(inputListOfInts) { i =>
      Future {
        appendElem(i)
      }
    }
    Await.ready(fut, Duration.Inf)
    listBufferOfInts.lb.length
  }

  def appendElem(elem: Int): Unit = {
    listBufferOfInts.append( elem )
  }

  MyTestObject.runFunction()
  MyTestObject.runFunction()
  MyTestObject.runFunction()

  println(s"collected ${listBufferOfInts.lb.size} elements")
}

我不知道确切的问题是什么,正如你所说的你简化了它,但你仍然有一个明显的竞争条件,多个线程修改一个可变集合,这非常糟糕。正如其他答案所指出的那样,您需要一些锁定,以便只有一个线程可以同时修改集合。如果您的计算量很大,以同步方式将结果附加到缓冲区应该不会显着影响性能,但如有疑问,请始终进行测量。

但是不需要同步,您可以做其他事情,而不需要变量和可变状态。让每个 Future return 您的部分结果,然后将它们合并到一个列表中,实际上 Future.traverse 就是这样做的。

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global

def runFunction: Int = {
  val inputListOfInts = 1 to 1000

  val fut: Future[List[Int]] = Future.traverse(inputListOfInts.toList) { i =>
    Future { 
      // some heavy calculations on i
      i * 4
    }
  }

  val listOfInts = Await.result(fut, Duration.Inf)
  listOfInts.size
}

Future.traverse 已经为您提供了一个包含所有结果的不可变列表,无需将它们附加到可变缓冲区。 不用说,你总会得到 1000 回报。

@ List.fill(10000)(runFunction).exists(_ != 1000) 
res18: Boolean = false