ConcurrentHashMap[String, AtomicInteger] 或 ConcurrentHashMap[String, Int] 用于线程安全计数器?
ConcurrentHashMap[String, AtomicInteger] or ConcurrentHashMap[String, Int] for thread-safe counters?
在 ConcurrentHashMap
中通过键递增并发计数器时,使用常规 Int
作为值是否安全,还是我们必须使用 AtomicInteger
?例如考虑以下两个实现
ConcurrentHashMap[String, Int]
final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
import scala.collection.JavaConverters._
private val chm = new ConcurrentHashMap[String, Int]().asScala
system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())
def countRequest(key: String): Unit =
chm.get(key) match {
case Some(value) => chm.update(key, value + 1)
case None => chm.update(key, 1)
}
private def resetCount(key: String) = chm.replace(key, 0)
private def publishAllMetrics(): Unit =
chm foreach { case (key, value) =>
// publishMetric(key, value.doubleValue())
resetCount(key)
}
}
ConcurrentHashMap[String, AtomicInteger]
final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
import scala.collection.JavaConverters._
private val chm = new ConcurrentHashMap[String, AtomicInteger]().asScala
system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())
def countRequest(key: String): Unit =
chm.getOrElseUpdate(key, new AtomicInteger(1)).incrementAndGet()
private def resetCount(key: String): Unit =
chm.getOrElseUpdate(key, new AtomicInteger(0)).set(0)
private def publishAllMetrics(): Unit =
chm foreach { case (key, value) =>
// publishMetric(key, value.doubleValue())
resetCount(key)
}
}
前一种实现安全吗?如果不是,在代码片段中的什么位置可以引入竞争条件?为什么?
问题的上下文是 AWS CloudWatch 指标,如果在每个请求上发布,这些指标在高频 API 上会变得非常昂贵。所以我想把它们“分批”并定期发布。
第一个实现不正确,因为 countRequest
方法不是原子的。考虑以下事件序列:
- 线程 A 和 B 都使用键
"foo"
调用 countRequest
- 线程A获取计数器值,暂且称它为x
- 线程B获取计数器值。它是相同的值 x,因为线程 A 尚未更新计数器。
- 线程 B 使用新的计数器值更新映射,x+1
- 线程A更新map,由于在B写入新的计数器值之前获得了计数器值,所以它也写入了x+1。
计数器应该是x+2,但它是x+1。这是一个典型的丢失更新问题。
由于使用了 getOrElseUpdate 方法,第二个实现也有类似的问题。 `ConcurrentHashMap` 没有那个方法,因此 Scala 包装器需要模拟它。我认为实现是继承自 `scala.collection.mutable.MapOps` 的实现,它的定义如下:
```
def getOrElseUpdate(key: K, op: => V): V =
获取(密钥)匹配{
案例 Some(v) => v
case None => val d = op;这个(键)= d; d
}
```
这显然不是原子的。
要正确实施,请在 ConcurrentHashMap
上使用 compute
方法。
此方法将自动执行,因此您不需要 AtomicInteger
。
在 ConcurrentHashMap
中通过键递增并发计数器时,使用常规 Int
作为值是否安全,还是我们必须使用 AtomicInteger
?例如考虑以下两个实现
ConcurrentHashMap[String, Int]
final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
import scala.collection.JavaConverters._
private val chm = new ConcurrentHashMap[String, Int]().asScala
system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())
def countRequest(key: String): Unit =
chm.get(key) match {
case Some(value) => chm.update(key, value + 1)
case None => chm.update(key, 1)
}
private def resetCount(key: String) = chm.replace(key, 0)
private def publishAllMetrics(): Unit =
chm foreach { case (key, value) =>
// publishMetric(key, value.doubleValue())
resetCount(key)
}
}
ConcurrentHashMap[String, AtomicInteger]
final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
import scala.collection.JavaConverters._
private val chm = new ConcurrentHashMap[String, AtomicInteger]().asScala
system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())
def countRequest(key: String): Unit =
chm.getOrElseUpdate(key, new AtomicInteger(1)).incrementAndGet()
private def resetCount(key: String): Unit =
chm.getOrElseUpdate(key, new AtomicInteger(0)).set(0)
private def publishAllMetrics(): Unit =
chm foreach { case (key, value) =>
// publishMetric(key, value.doubleValue())
resetCount(key)
}
}
前一种实现安全吗?如果不是,在代码片段中的什么位置可以引入竞争条件?为什么?
问题的上下文是 AWS CloudWatch 指标,如果在每个请求上发布,这些指标在高频 API 上会变得非常昂贵。所以我想把它们“分批”并定期发布。
第一个实现不正确,因为 countRequest
方法不是原子的。考虑以下事件序列:
- 线程 A 和 B 都使用键
"foo"
调用 - 线程A获取计数器值,暂且称它为x
- 线程B获取计数器值。它是相同的值 x,因为线程 A 尚未更新计数器。
- 线程 B 使用新的计数器值更新映射,x+1
- 线程A更新map,由于在B写入新的计数器值之前获得了计数器值,所以它也写入了x+1。
countRequest
计数器应该是x+2,但它是x+1。这是一个典型的丢失更新问题。
要正确实施,请在 ConcurrentHashMap
上使用 compute
方法。
此方法将自动执行,因此您不需要 AtomicInteger
。