添加后喷射缓存不显示新元素
spray-cache not showing new elements after adding
我的目标是向现有的喷射缓存实例添加元素。如果没有任何明显的原因或错误,这不会发生。
我的方法是使用可用的 scala-future api 获得中间结果和可用的 scala-futures 映射方法(map,foreach)到 "unpack" 由 futures 产生的数据。
这是显示我的方法的代码(它是完整的,可以立即测试):
import akka.actor.ActorSystem
import spray.caching.LruCache
import scala.util.{Failure, Success}
case class Cached(elements: List[Int])
object TestApp extends App {
//ignore these, just so that cache gets some execution context
implicit val system = ActorSystem("TestApp")
implicit def dispatcher = system.dispatcher
val cache = LruCache[Cached]()
val cacheKey = "test"
cache(cacheKey) {
new Cached(List(1, 2, 3, 4))
}
def merge(key: String, mergeList: List[Int]) = {
// get the existing cache content
cache.get(key).foreach { cachedFuture =>
// wait until it is actually returned
cachedFuture.onComplete {
// merge extracted elements with mergeList
case Success(cached) =>
val mergedCache = cached.elements ++ mergeList
// before re-constructing cache element, remove it from spray-cache
cache.remove(key).foreach { removeFuture =>
// as soon as remove is complete
removeFuture.onComplete {
// re-construct cache element
case Success(_) =>
// actually reconstructing cache element with the key
cache(key) { new Cached(mergedCache) }
case Failure(ex) => println(ex)
}
}
case Failure(ex) => println(ex)
}
}
}
// merge new element 5
merge(cacheKey, List(5))
cache.get(cacheKey).map(_.onComplete {
case Success(cached) =>
// new cache should contain 1, 2, 3, 4, 5
println(s"Cache content is: ${cached.elements.mkString("; ")}")
sys.exit(0)
case Failure(ex) => println(ex); sys.exit(0)
})
}
下面是解决我在上面的代码中遇到的问题的有效解决方案。
我特别采纳了 jrudolph 的一些建议
- 解决合并调用和后续获取之间的竞争条件
- 解决忽略 cache.get() 不包含条目的情况
- 解决合并操作的线程安全问题
我没能解决的是 Await.result(...) 的用法 def merge 与 "outside world".
同步
关于 jrudolph 关于使用 Map 而不是 LruCache 的评论,他基本上是正确的。在我的特定用例中,我选择了 LruCache,因为实际应用程序只将几个键放入缓存中,但 元素:List[Int]包裹在 Cached 中的实际上是一个巨大的复杂集合,如果应用程序在某个时候添加了太多键,它应该被驱逐。
import akka.actor.ActorSystem
import spray.caching.LruCache
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
case class Cached(elements: List[Int])
object TestApp extends App {
//ignore these, just so that cache gets some execution context
implicit val system = ActorSystem("TestApp")
implicit def dispatcher = system.dispatcher
val cache = LruCache[Cached]()
val cacheKey = "test"
cache(cacheKey) {
new Cached(List(1, 2, 3, 4))
}
def merge(key: String, mergeList: List[Int]) = {
// get the existing cache content
val mergeResult: Future[Cached] = cache.get(key) match {
case Some(cacheFuture) => cacheFuture.flatMap { case cached =>
recreateCacheKey(key, Cached(cached.elements ++ mergeList))
}
case None => recreateCacheKey(key, Cached(mergeList))
}
// await result so that users of the def merge get impression that this def is synchronous
Await.result(mergeResult, 5 seconds)
}
// synchronized should eliminate issue with thread-safety of cache modifications
private def recreateCacheKey(cacheKey: String, newCached: Cached) = synchronized {
val cacheRecreationFuture = cache.remove(cacheKey)
.fold(cache(cacheKey) { newCached })(_.flatMap(removedFuture => cache(cacheKey, () => Future { newCached })))
cacheRecreationFuture
}
private def printCacheContent = {
cache.get(cacheKey).map(_.onComplete {
case Success(cached) =>
// new cache should contain 1, 2, 3, 4, 5
println(s"Cache content is: ${cached.elements.mkString("; ")}")
sys.exit(0)
case Failure(ex) => println(ex); sys.exit(0)
})
}
/*****************/
/* EXECUTE STUFF */
/*****************/
// merge new element 5
merge(cacheKey, List(5))
printCacheContent
}
我的目标是向现有的喷射缓存实例添加元素。如果没有任何明显的原因或错误,这不会发生。
我的方法是使用可用的 scala-future api 获得中间结果和可用的 scala-futures 映射方法(map,foreach)到 "unpack" 由 futures 产生的数据。
这是显示我的方法的代码(它是完整的,可以立即测试):
import akka.actor.ActorSystem
import spray.caching.LruCache
import scala.util.{Failure, Success}
case class Cached(elements: List[Int])
object TestApp extends App {
//ignore these, just so that cache gets some execution context
implicit val system = ActorSystem("TestApp")
implicit def dispatcher = system.dispatcher
val cache = LruCache[Cached]()
val cacheKey = "test"
cache(cacheKey) {
new Cached(List(1, 2, 3, 4))
}
def merge(key: String, mergeList: List[Int]) = {
// get the existing cache content
cache.get(key).foreach { cachedFuture =>
// wait until it is actually returned
cachedFuture.onComplete {
// merge extracted elements with mergeList
case Success(cached) =>
val mergedCache = cached.elements ++ mergeList
// before re-constructing cache element, remove it from spray-cache
cache.remove(key).foreach { removeFuture =>
// as soon as remove is complete
removeFuture.onComplete {
// re-construct cache element
case Success(_) =>
// actually reconstructing cache element with the key
cache(key) { new Cached(mergedCache) }
case Failure(ex) => println(ex)
}
}
case Failure(ex) => println(ex)
}
}
}
// merge new element 5
merge(cacheKey, List(5))
cache.get(cacheKey).map(_.onComplete {
case Success(cached) =>
// new cache should contain 1, 2, 3, 4, 5
println(s"Cache content is: ${cached.elements.mkString("; ")}")
sys.exit(0)
case Failure(ex) => println(ex); sys.exit(0)
})
}
下面是解决我在上面的代码中遇到的问题的有效解决方案。
我特别采纳了 jrudolph 的一些建议
- 解决合并调用和后续获取之间的竞争条件
- 解决忽略 cache.get() 不包含条目的情况
- 解决合并操作的线程安全问题
我没能解决的是 Await.result(...) 的用法 def merge 与 "outside world".
同步关于 jrudolph 关于使用 Map 而不是 LruCache 的评论,他基本上是正确的。在我的特定用例中,我选择了 LruCache,因为实际应用程序只将几个键放入缓存中,但 元素:List[Int]包裹在 Cached 中的实际上是一个巨大的复杂集合,如果应用程序在某个时候添加了太多键,它应该被驱逐。
import akka.actor.ActorSystem
import spray.caching.LruCache
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
case class Cached(elements: List[Int])
object TestApp extends App {
//ignore these, just so that cache gets some execution context
implicit val system = ActorSystem("TestApp")
implicit def dispatcher = system.dispatcher
val cache = LruCache[Cached]()
val cacheKey = "test"
cache(cacheKey) {
new Cached(List(1, 2, 3, 4))
}
def merge(key: String, mergeList: List[Int]) = {
// get the existing cache content
val mergeResult: Future[Cached] = cache.get(key) match {
case Some(cacheFuture) => cacheFuture.flatMap { case cached =>
recreateCacheKey(key, Cached(cached.elements ++ mergeList))
}
case None => recreateCacheKey(key, Cached(mergeList))
}
// await result so that users of the def merge get impression that this def is synchronous
Await.result(mergeResult, 5 seconds)
}
// synchronized should eliminate issue with thread-safety of cache modifications
private def recreateCacheKey(cacheKey: String, newCached: Cached) = synchronized {
val cacheRecreationFuture = cache.remove(cacheKey)
.fold(cache(cacheKey) { newCached })(_.flatMap(removedFuture => cache(cacheKey, () => Future { newCached })))
cacheRecreationFuture
}
private def printCacheContent = {
cache.get(cacheKey).map(_.onComplete {
case Success(cached) =>
// new cache should contain 1, 2, 3, 4, 5
println(s"Cache content is: ${cached.elements.mkString("; ")}")
sys.exit(0)
case Failure(ex) => println(ex); sys.exit(0)
})
}
/*****************/
/* EXECUTE STUFF */
/*****************/
// merge new element 5
merge(cacheKey, List(5))
printCacheContent
}