ConcurrentHashMap[String, AtomicInteger] 或 ConcurrentHashMap[String, Int] 用于线程安全计数器?

ConcurrentHashMap[String, AtomicInteger] or ConcurrentHashMap[String, Int] for thread-safe counters?

ConcurrentHashMap 中通过键递增并发计数器时,使用常规 Int 作为值是否安全,还是我们必须使用 AtomicInteger?例如考虑以下两个实现

ConcurrentHashMap[String, Int]

final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
  import scala.collection.JavaConverters._
  private val chm = new ConcurrentHashMap[String, Int]().asScala

  system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())

  def countRequest(key: String): Unit =
    chm.get(key) match {
      case Some(value) => chm.update(key, value + 1)
      case None => chm.update(key, 1)
    }

  private def resetCount(key: String) = chm.replace(key, 0)

  private def publishAllMetrics(): Unit =
    chm foreach { case (key, value) =>
      // publishMetric(key, value.doubleValue())
      resetCount(key)
    }
}

ConcurrentHashMap[String, AtomicInteger]

final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
  import scala.collection.JavaConverters._
  private val chm = new ConcurrentHashMap[String, AtomicInteger]().asScala

  system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())

  def countRequest(key: String): Unit =
    chm.getOrElseUpdate(key, new AtomicInteger(1)).incrementAndGet()
  
  private def resetCount(key: String): Unit =
    chm.getOrElseUpdate(key, new AtomicInteger(0)).set(0)

  private def publishAllMetrics(): Unit =
    chm foreach { case (key, value) =>
      // publishMetric(key, value.doubleValue())
      resetCount(key)
    }
}

前一种实现安全吗?如果不是,在代码片段中的什么位置可以引入竞争条件?为什么?


问题的上下文是 AWS CloudWatch 指标,如果在每个请求上发布,这些指标在高频 API 上会变得非常昂贵。所以我想把它们“分批”并定期发布。

第一个实现不正确,因为 countRequest 方法不是原子的。考虑以下事件序列:

  • 线程 A 和 B 都使用键 "foo"
  • 调用 countRequest
  • 线程A获取计数器值,暂且称它为x
  • 线程B获取计数器值。它是相同的值 x,因为线程 A 尚未更新计数器。
  • 线程 B 使用新的计数器值更新映射,x+1
  • 线程A更新map,由于在B写入新的计数器值之前获得了计数器值,所以它也写入了x+1。

计数器应该是x+2,但它是x+1。这是一个典型的丢失更新问题。

由于使用了 getOrElseUpdate 方法,第二个实现也有类似的问题。 `ConcurrentHashMap` 没有那个方法,因此 Scala 包装器需要模拟它。我认为实现是继承自 `scala.collection.mutable.MapOps` 的实现,它的定义如下: ``` def getOrElseUpdate(key: K, op: => V): V = 获取(密钥)匹配{ 案例 Some(v) => v case None => val d = op;这个(键)= d; d } ``` 这显然不是原子的。

要正确实施,请在 ConcurrentHashMap 上使用 compute 方法。

此方法将自动执行,因此您不需要 AtomicInteger