在 Actor 的接收方法的 Future 中关闭 java.util.concurrent.ConcurrentHashMap?
Closing over java.util.concurrent.ConcurrentHashMap inside a Future of Actor's receive method?
我有一个演员,我想将我的可变状态存储在地图中。
客户端可以向该 actor 发送 Get(key:String)
和 Put(key:String,value:String)
消息。
我正在考虑以下选项。
- 不要在 Actor 的接收方法中使用 futures。如果我有大量
gets/puts
,这可能会对延迟和吞吐量产生负面影响,因为所有操作都将按顺序执行。
- 使用
java.util.concurrent.ConcurrentHashMap
,然后在Future
中调用gets 和puts。
鉴于 java.util.concurrent.ConcurrentHashMap
是线程安全的并且提供者的粒度级别更细,我想知道在为每个放置和获取创建的 Future 中关闭 concurrentHashMap 是否仍然是一个问题。
我知道 close Actor 中 Future 中的可变状态是一个非常糟糕的主意,但我仍然有兴趣知道是否在此特殊情况是否正确?
一般来说,java.util.concurrent.ConcurrentHashMap
是为了并发使用。只要您不尝试将闭包传输到另一台机器,并且考虑并发使用它的含义(例如,如果您读取一个值,使用一个函数修改它,然后再放回去,你想使用 replace(key, oldValue, newValue)
方法来确保它在你进行处理时没有改变?),在 Futures 中应该没问题。
可能有点晚了,但是,在书中 Reactive Web Applications,作者已经指出了这个特定问题的间接方法,如下所示使用 pipeTo。
def receive = {
case ComputeReach(tweetId) =>
fetchRetweets(tweetId, sender()) pipeTo self
case fetchedRetweets: FetchedRetweets =>
followerCountsByRetweet += fetchedRetweets -> List.empty
fetchedRetweets.retweets.foreach { rt =>
userFollowersCounter ! FetchFollowerCount(
fetchedRetweets.tweetId, rt.user
)
}
...
}
其中 followerCountsByRetweet
是 actor 的可变状态。 fetchRetweets()
的结果是一个 Future 被管道传输到与 FetchedRetweets
消息相同的参与者,然后该参与者作用于消息以修改 acto 的状态。这将减轻任何并发操作州
我有一个演员,我想将我的可变状态存储在地图中。
客户端可以向该 actor 发送 Get(key:String)
和 Put(key:String,value:String)
消息。
我正在考虑以下选项。
- 不要在 Actor 的接收方法中使用 futures。如果我有大量
gets/puts
,这可能会对延迟和吞吐量产生负面影响,因为所有操作都将按顺序执行。 - 使用
java.util.concurrent.ConcurrentHashMap
,然后在Future
中调用gets 和puts。
鉴于 java.util.concurrent.ConcurrentHashMap
是线程安全的并且提供者的粒度级别更细,我想知道在为每个放置和获取创建的 Future 中关闭 concurrentHashMap 是否仍然是一个问题。
我知道 close Actor 中 Future 中的可变状态是一个非常糟糕的主意,但我仍然有兴趣知道是否在此特殊情况是否正确?
一般来说,java.util.concurrent.ConcurrentHashMap
是为了并发使用。只要您不尝试将闭包传输到另一台机器,并且考虑并发使用它的含义(例如,如果您读取一个值,使用一个函数修改它,然后再放回去,你想使用 replace(key, oldValue, newValue)
方法来确保它在你进行处理时没有改变?),在 Futures 中应该没问题。
可能有点晚了,但是,在书中 Reactive Web Applications,作者已经指出了这个特定问题的间接方法,如下所示使用 pipeTo。
def receive = {
case ComputeReach(tweetId) =>
fetchRetweets(tweetId, sender()) pipeTo self
case fetchedRetweets: FetchedRetweets =>
followerCountsByRetweet += fetchedRetweets -> List.empty
fetchedRetweets.retweets.foreach { rt =>
userFollowersCounter ! FetchFollowerCount(
fetchedRetweets.tweetId, rt.user
)
}
...
}
其中 followerCountsByRetweet
是 actor 的可变状态。 fetchRetweets()
的结果是一个 Future 被管道传输到与 FetchedRetweets
消息相同的参与者,然后该参与者作用于消息以修改 acto 的状态。这将减轻任何并发操作州