在 Actor 的接收方法的 Future 中关闭 java.util.concurrent.ConcurrentHashMap?

Closing over java.util.concurrent.ConcurrentHashMap inside a Future of Actor's receive method?

我有一个演员,我想将我的可变状态存储在地图中。

客户端可以向该 actor 发送 Get(key:String)Put(key:String,value:String) 消息。

我正在考虑以下选项。

  1. 不要在 Actor 的接收方法中使用 futures。如果我有大量 gets/puts,这可能会对延迟和吞吐量产生负面影响,因为所有操作都将按顺序执行。
  2. 使用java.util.concurrent.ConcurrentHashMap,然后在Future 中调用gets 和puts。

鉴于 java.util.concurrent.ConcurrentHashMap 是线程安全的并且提供者的粒度级别更细,我想知道在为每个放置和获取创建的 Future 中关闭 concurrentHashMap 是否仍然是一个问题。

我知道 close Actor 中 Future 中的可变状态是一个非常糟糕的主意,但我仍然有兴趣知道是否在此特殊情况是否正确?

一般来说,java.util.concurrent.ConcurrentHashMap是为了并发使用。只要您不尝试将闭包传输到另一台机器,并且考虑并发使用它的含义(例如,如果您读取一个值,使用一个函数修改它,然后再放回去,你想使用 replace(key, oldValue, newValue) 方法来确保它在你进行处理时没有改变?),在 Futures 中应该没问题。

可能有点晚了,但是,在书中 Reactive Web Applications,作者已经指出了这个特定问题的间接方法,如下所示使用 pipeTo。

def receive = {
  case ComputeReach(tweetId) =>
    fetchRetweets(tweetId, sender()) pipeTo self
  case fetchedRetweets: FetchedRetweets =>
    followerCountsByRetweet += fetchedRetweets -> List.empty
    fetchedRetweets.retweets.foreach { rt =>
      userFollowersCounter ! FetchFollowerCount(
       fetchedRetweets.tweetId, rt.user
    )
}
...
}

其中 followerCountsByRetweet 是 actor 的可变状态。 fetchRetweets() 的结果是一个 Future 被管道传输到与 FetchedRetweets 消息相同的参与者,然后该参与者作用于消息以修改 acto 的状态。这将减轻任何并发操作州