关于Future.firstCompletedOf和垃圾回收机制
About Future.firstCompletedOf and Garbage Collect mechanism
我在实际项目中遇到过这个问题,并通过我的测试代码和分析器证明了这一点。我没有粘贴 "tl;dr" 代码,而是向您展示了一张图片,然后对其进行了描述。
简单地说,我正在使用 Future.firstCompletedOf
从 2 Future
中获取结果,两者没有共享的东西并且彼此不关心。尽管,这是我要解决的问题,垃圾收集器无法回收第一个 Result
对象,直到两个 Future
完成 。
所以我很好奇这背后的机制。有人可以从较低的层次解释一下,或者提供一些提示让我研究一下。
谢谢!
PS: 是因为他们共享相同的ExecutionContext
吗?
** 更新 ** 根据要求粘贴测试代码
object Main extends App{
println("Test start")
val timeout = 30000
trait Result {
val id: Int
val str = "I'm short"
}
class BigObject(val id: Int) extends Result{
override val str = "really big str"
}
def guardian = Future({
Thread.sleep(timeout)
new Result { val id = 99999 }
})
def worker(i: Int) = Future({
Thread.sleep(100)
new BigObject(i)
})
for (i <- Range(1, 1000)){
println("round " + i)
Thread.sleep(20)
Future.firstCompletedOf(Seq(
guardian,
worker(i)
)).map( r => println("result" + r.id))
}
while (true){
Thread.sleep(2000)
}
}
让我们看看firstCompletedOf
是如何实现的:
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val completeFirst: Try[T] => Unit = p tryComplete _
futures foreach { _ onComplete completeFirst }
p.future
}
做{ futures foreach { _ onComplete completeFirst }
时,函数completeFirst
保存在某处
通过 ExecutionContext.execute
。这个函数到底保存在哪里是无关紧要的,我们只知道它必须保存在某个地方
以便稍后可以在线程可用时选择并在线程池上执行。只有当 future 完成时,才不再需要对 completeFirst
的引用。
因为 completeFirst
结束于 p
,只要还有一个未来(来自 futures
)等待完成,就会有一个对 p
的引用防止它被垃圾收集(即使到那时 firstCompletedOf
可能已经返回,从堆栈中删除 p
)。
当第一个 future 完成时,它将结果保存到 promise 中(通过调用 p.tryComplete
)。
因为 promise p
保存了结果,结果至少在 p
是可达的时候是可达的,正如我们看到的 p
是可达的,只要至少有一个来自 futures
还没有完成。
这就是为什么无法在所有期货完成之前收集结果的原因。
更新:
现在的问题是:它能修好吗?我认为可以。我们所要做的就是确保第一个 future 以线程安全的方式完成 "nulls out" 对 p 的引用,这可以通过使用 AtomicReference 的示例来完成。像这样:
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val pref = new java.util.concurrent.atomic.AtomicReference(p)
val completeFirst: Try[T] => Unit = { result: Try[T] =>
val promise = pref.getAndSet(null)
if (promise != null) {
promise.tryComplete(result)
}
}
futures foreach { _ onComplete completeFirst }
p.future
}
我已经对其进行了测试,正如预期的那样,它确实允许在第一个 future 完成后立即对结果进行垃圾回收。它在所有其他方面应该表现相同。
我在实际项目中遇到过这个问题,并通过我的测试代码和分析器证明了这一点。我没有粘贴 "tl;dr" 代码,而是向您展示了一张图片,然后对其进行了描述。
简单地说,我正在使用 Future.firstCompletedOf
从 2 Future
中获取结果,两者没有共享的东西并且彼此不关心。尽管,这是我要解决的问题,垃圾收集器无法回收第一个 Result
对象,直到两个 Future
完成 。
所以我很好奇这背后的机制。有人可以从较低的层次解释一下,或者提供一些提示让我研究一下。
谢谢!
PS: 是因为他们共享相同的ExecutionContext
吗?
** 更新 ** 根据要求粘贴测试代码
object Main extends App{
println("Test start")
val timeout = 30000
trait Result {
val id: Int
val str = "I'm short"
}
class BigObject(val id: Int) extends Result{
override val str = "really big str"
}
def guardian = Future({
Thread.sleep(timeout)
new Result { val id = 99999 }
})
def worker(i: Int) = Future({
Thread.sleep(100)
new BigObject(i)
})
for (i <- Range(1, 1000)){
println("round " + i)
Thread.sleep(20)
Future.firstCompletedOf(Seq(
guardian,
worker(i)
)).map( r => println("result" + r.id))
}
while (true){
Thread.sleep(2000)
}
}
让我们看看firstCompletedOf
是如何实现的:
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val completeFirst: Try[T] => Unit = p tryComplete _
futures foreach { _ onComplete completeFirst }
p.future
}
做{ futures foreach { _ onComplete completeFirst }
时,函数completeFirst
保存在某处
通过 ExecutionContext.execute
。这个函数到底保存在哪里是无关紧要的,我们只知道它必须保存在某个地方
以便稍后可以在线程可用时选择并在线程池上执行。只有当 future 完成时,才不再需要对 completeFirst
的引用。
因为 completeFirst
结束于 p
,只要还有一个未来(来自 futures
)等待完成,就会有一个对 p
的引用防止它被垃圾收集(即使到那时 firstCompletedOf
可能已经返回,从堆栈中删除 p
)。
当第一个 future 完成时,它将结果保存到 promise 中(通过调用 p.tryComplete
)。
因为 promise p
保存了结果,结果至少在 p
是可达的时候是可达的,正如我们看到的 p
是可达的,只要至少有一个来自 futures
还没有完成。
这就是为什么无法在所有期货完成之前收集结果的原因。
更新: 现在的问题是:它能修好吗?我认为可以。我们所要做的就是确保第一个 future 以线程安全的方式完成 "nulls out" 对 p 的引用,这可以通过使用 AtomicReference 的示例来完成。像这样:
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val pref = new java.util.concurrent.atomic.AtomicReference(p)
val completeFirst: Try[T] => Unit = { result: Try[T] =>
val promise = pref.getAndSet(null)
if (promise != null) {
promise.tryComplete(result)
}
}
futures foreach { _ onComplete completeFirst }
p.future
}
我已经对其进行了测试,正如预期的那样,它确实允许在第一个 future 完成后立即对结果进行垃圾回收。它在所有其他方面应该表现相同。