添加后喷射缓存不显示新元素

spray-cache not showing new elements after adding

我的目标是向现有的喷射缓存实例添加元素。如果没有任何明显的原因或错误,这不会发生。

我的方法是使用可用的 scala-future api 获得中间结果和可用的 scala-futures 映射方法(map,foreach)到 "unpack" 由 futures 产生的数据。

这是显示我的方法的代码(它是完整的,可以立即测试):

import akka.actor.ActorSystem
import spray.caching.LruCache

import scala.util.{Failure, Success}

case class Cached(elements: List[Int])

object TestApp extends App {

  //ignore these, just so that cache gets some execution context
  implicit val system = ActorSystem("TestApp")
  implicit def dispatcher = system.dispatcher

  val cache = LruCache[Cached]()
  val cacheKey = "test"

  cache(cacheKey) {
    new Cached(List(1, 2, 3, 4))
  }

  def merge(key: String, mergeList: List[Int]) = {
    // get the existing cache content
    cache.get(key).foreach { cachedFuture =>
      // wait until it is actually returned
      cachedFuture.onComplete {
        // merge extracted elements with mergeList
        case Success(cached) =>
          val mergedCache = cached.elements ++ mergeList

          // before re-constructing cache element, remove it from spray-cache
          cache.remove(key).foreach { removeFuture =>
            // as soon as remove is complete
            removeFuture.onComplete {
              // re-construct cache element
              case Success(_) =>
                // actually reconstructing cache element with the key
                cache(key) { new Cached(mergedCache) }
              case Failure(ex) => println(ex)
            }
          }
        case Failure(ex) => println(ex)
      }
    }
  }

  // merge new element 5
  merge(cacheKey, List(5))
  cache.get(cacheKey).map(_.onComplete {
    case Success(cached) =>
      // new cache should contain 1, 2, 3, 4, 5
      println(s"Cache content is: ${cached.elements.mkString("; ")}")
      sys.exit(0)
    case Failure(ex) => println(ex); sys.exit(0)
  })
}

下面是解决我在上面的代码中遇到的问题的有效解决方案。

我特别采纳了 jrudolph 的一些建议

  • 解决合并调用和后续获取之间的竞争条件
  • 解决忽略 cache.get() 不包含条目的情况
  • 解决合并操作的线程安全问题

我没能解决的是 Await.result(...) 的用法 def merge 与 "outside world".

同步

关于 jrudolph 关于使用 Map 而不是 LruCache 的评论,他基本上是正确的。在我的特定用例中,我选择了 LruCache,因为实际应用程序只将几个键放入缓存中,但 元素:List[Int]包裹在 Cached 中的实际上是一个巨大的复杂集合,如果应用程序在某个时候添加了太多键,它应该被驱逐。

import akka.actor.ActorSystem
import spray.caching.LruCache

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

case class Cached(elements: List[Int])

object TestApp extends App {

  //ignore these, just so that cache gets some execution context
  implicit val system = ActorSystem("TestApp")
  implicit def dispatcher = system.dispatcher

  val cache = LruCache[Cached]()
  val cacheKey = "test"

  cache(cacheKey) {
    new Cached(List(1, 2, 3, 4))
  }

  def merge(key: String, mergeList: List[Int]) = {
    // get the existing cache content
    val mergeResult: Future[Cached] = cache.get(key) match {
      case Some(cacheFuture) => cacheFuture.flatMap { case cached =>
          recreateCacheKey(key, Cached(cached.elements ++ mergeList))
      }
      case None => recreateCacheKey(key, Cached(mergeList))
    }

    // await result so that users of the def merge get impression that this def is synchronous
    Await.result(mergeResult, 5 seconds)
  }

  // synchronized should eliminate issue with thread-safety of cache modifications
  private def recreateCacheKey(cacheKey: String, newCached: Cached) = synchronized {
      val cacheRecreationFuture = cache.remove(cacheKey)
        .fold(cache(cacheKey) { newCached })(_.flatMap(removedFuture => cache(cacheKey, () => Future { newCached })))
      cacheRecreationFuture
  }

  private def printCacheContent = {
    cache.get(cacheKey).map(_.onComplete {
      case Success(cached) =>
        // new cache should contain 1, 2, 3, 4, 5
        println(s"Cache content is: ${cached.elements.mkString("; ")}")
        sys.exit(0)
      case Failure(ex) => println(ex); sys.exit(0)
    })
  }

  /*****************/
  /* EXECUTE STUFF */
  /*****************/

  // merge new element 5
  merge(cacheKey, List(5))
  printCacheContent
}