Scala:使用 ListBuffer 附加的并行执行不会产生预期的结果
Scala: Parallel execution with ListBuffer appends doesn't produce expected outcome
我知道我在 mutable.ListBuffer
上做错了,但我不知道如何解决它(以及对问题的正确解释)。
我简化了下面的代码以重现该行为。
我基本上是在尝试 运行 并行运行,以便在处理第一个列表时将元素添加到列表中。我最终得到 "losing" 个元素。
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.{ExecutionContext}
import ExecutionContext.Implicits.global
object MyTestObject {
var listBufferOfInts = new ListBuffer[Int]() // files that are processed
def runFunction(): Int = {
listBufferOfInts = new ListBuffer[Int]()
val inputListOfInts = 1 to 1000
val fut = Future.traverse(inputListOfInts) { i =>
Future {
appendElem(i)
}
}
Await.ready(fut, Duration.Inf)
listBufferOfInts.length
}
def appendElem(elem: Int): Unit = {
listBufferOfInts ++= List(elem)
}
}
MyTestObject.runFunction()
MyTestObject.runFunction()
MyTestObject.runFunction()
哪个returns:
res0: Int = 937
res1: Int = 992
res2: Int = 997
显然我希望 1000
一直返回。如何修复我的代码以保留 "architecture" 但使我的 ListBuffer "synchronized" ?
改变
listBufferOfInts ++= List(elem)
到
synchronized {
listBufferOfInts ++= List(elem)
}
让它发挥作用。可能会成为性能问题?我仍然对解释感兴趣,也许还有更好的做事方式!
我不确定上面的内容是否正确显示了您正在尝试做的事情。也许问题是您实际上共享了一个在 runFunction 中重新初始化的 var ListBuffer。
当我取出这个时,我正确地收集了我期望的所有事件:
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global
object BrokenTestObject extends App {
var listBufferOfInts = ( new ListBuffer[Int]() )
def runFunction(): Int = {
val inputListOfInts = 1 to 1000
val fut = Future.traverse(inputListOfInts) { i =>
Future {
appendElem(i)
}
}
Await.ready(fut, Duration.Inf)
listBufferOfInts.length
}
def appendElem(elem: Int): Unit = {
listBufferOfInts.append( elem )
}
BrokenTestObject.runFunction()
BrokenTestObject.runFunction()
BrokenTestObject.runFunction()
println(s"collected ${listBufferOfInts.length} elements")
}
如果你真的有同步问题,你可以使用类似下面的方法:
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global
class WrappedListBuffer(val lb: ListBuffer[Int]) {
def append(i: Int) {
this.synchronized {
lb.append(i)
}
}
}
object MyTestObject extends App {
var listBufferOfInts = new WrappedListBuffer( new ListBuffer[Int]() )
def runFunction(): Int = {
val inputListOfInts = 1 to 1000
val fut = Future.traverse(inputListOfInts) { i =>
Future {
appendElem(i)
}
}
Await.ready(fut, Duration.Inf)
listBufferOfInts.lb.length
}
def appendElem(elem: Int): Unit = {
listBufferOfInts.append( elem )
}
MyTestObject.runFunction()
MyTestObject.runFunction()
MyTestObject.runFunction()
println(s"collected ${listBufferOfInts.lb.size} elements")
}
我不知道确切的问题是什么,正如你所说的你简化了它,但你仍然有一个明显的竞争条件,多个线程修改一个可变集合,这非常糟糕。正如其他答案所指出的那样,您需要一些锁定,以便只有一个线程可以同时修改集合。如果您的计算量很大,以同步方式将结果附加到缓冲区应该不会显着影响性能,但如有疑问,请始终进行测量。
但是不需要同步,您可以做其他事情,而不需要变量和可变状态。让每个 Future
return 您的部分结果,然后将它们合并到一个列表中,实际上 Future.traverse
就是这样做的。
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
def runFunction: Int = {
val inputListOfInts = 1 to 1000
val fut: Future[List[Int]] = Future.traverse(inputListOfInts.toList) { i =>
Future {
// some heavy calculations on i
i * 4
}
}
val listOfInts = Await.result(fut, Duration.Inf)
listOfInts.size
}
Future.traverse
已经为您提供了一个包含所有结果的不可变列表,无需将它们附加到可变缓冲区。
不用说,你总会得到 1000
回报。
@ List.fill(10000)(runFunction).exists(_ != 1000)
res18: Boolean = false
我知道我在 mutable.ListBuffer
上做错了,但我不知道如何解决它(以及对问题的正确解释)。
我简化了下面的代码以重现该行为。
我基本上是在尝试 运行 并行运行,以便在处理第一个列表时将元素添加到列表中。我最终得到 "losing" 个元素。
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.{ExecutionContext}
import ExecutionContext.Implicits.global
object MyTestObject {
var listBufferOfInts = new ListBuffer[Int]() // files that are processed
def runFunction(): Int = {
listBufferOfInts = new ListBuffer[Int]()
val inputListOfInts = 1 to 1000
val fut = Future.traverse(inputListOfInts) { i =>
Future {
appendElem(i)
}
}
Await.ready(fut, Duration.Inf)
listBufferOfInts.length
}
def appendElem(elem: Int): Unit = {
listBufferOfInts ++= List(elem)
}
}
MyTestObject.runFunction()
MyTestObject.runFunction()
MyTestObject.runFunction()
哪个returns:
res0: Int = 937
res1: Int = 992
res2: Int = 997
显然我希望 1000
一直返回。如何修复我的代码以保留 "architecture" 但使我的 ListBuffer "synchronized" ?
改变
listBufferOfInts ++= List(elem)
到
synchronized {
listBufferOfInts ++= List(elem)
}
让它发挥作用。可能会成为性能问题?我仍然对解释感兴趣,也许还有更好的做事方式!
我不确定上面的内容是否正确显示了您正在尝试做的事情。也许问题是您实际上共享了一个在 runFunction 中重新初始化的 var ListBuffer。
当我取出这个时,我正确地收集了我期望的所有事件:
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global
object BrokenTestObject extends App {
var listBufferOfInts = ( new ListBuffer[Int]() )
def runFunction(): Int = {
val inputListOfInts = 1 to 1000
val fut = Future.traverse(inputListOfInts) { i =>
Future {
appendElem(i)
}
}
Await.ready(fut, Duration.Inf)
listBufferOfInts.length
}
def appendElem(elem: Int): Unit = {
listBufferOfInts.append( elem )
}
BrokenTestObject.runFunction()
BrokenTestObject.runFunction()
BrokenTestObject.runFunction()
println(s"collected ${listBufferOfInts.length} elements")
}
如果你真的有同步问题,你可以使用类似下面的方法:
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future }
import scala.concurrent.{ ExecutionContext }
import ExecutionContext.Implicits.global
class WrappedListBuffer(val lb: ListBuffer[Int]) {
def append(i: Int) {
this.synchronized {
lb.append(i)
}
}
}
object MyTestObject extends App {
var listBufferOfInts = new WrappedListBuffer( new ListBuffer[Int]() )
def runFunction(): Int = {
val inputListOfInts = 1 to 1000
val fut = Future.traverse(inputListOfInts) { i =>
Future {
appendElem(i)
}
}
Await.ready(fut, Duration.Inf)
listBufferOfInts.lb.length
}
def appendElem(elem: Int): Unit = {
listBufferOfInts.append( elem )
}
MyTestObject.runFunction()
MyTestObject.runFunction()
MyTestObject.runFunction()
println(s"collected ${listBufferOfInts.lb.size} elements")
}
我不知道确切的问题是什么,正如你所说的你简化了它,但你仍然有一个明显的竞争条件,多个线程修改一个可变集合,这非常糟糕。正如其他答案所指出的那样,您需要一些锁定,以便只有一个线程可以同时修改集合。如果您的计算量很大,以同步方式将结果附加到缓冲区应该不会显着影响性能,但如有疑问,请始终进行测量。
但是不需要同步,您可以做其他事情,而不需要变量和可变状态。让每个 Future
return 您的部分结果,然后将它们合并到一个列表中,实际上 Future.traverse
就是这样做的。
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
def runFunction: Int = {
val inputListOfInts = 1 to 1000
val fut: Future[List[Int]] = Future.traverse(inputListOfInts.toList) { i =>
Future {
// some heavy calculations on i
i * 4
}
}
val listOfInts = Await.result(fut, Duration.Inf)
listOfInts.size
}
Future.traverse
已经为您提供了一个包含所有结果的不可变列表,无需将它们附加到可变缓冲区。
不用说,你总会得到 1000
回报。
@ List.fill(10000)(runFunction).exists(_ != 1000)
res18: Boolean = false