如何将异步缓存与 Kotlin 协程一起使用?
How do I use an async cache with Kotlin coroutines?
我有一个使用协程的 Kotlin JVM 服务器应用程序,我需要在非阻塞网络调用之前放置一个缓存。我想我可以使用 Caffeine AsyncLoadingCache
to get the non-blocking cache behaviour I need. The AsyncCacheLoader
接口,我需要实现 uses CompletableFuture
。同时,我要调用的加载缓存条目的方法是一个 suspend
函数。
我可以这样弥合差距:
abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
abstract suspend fun load(key: K): V
final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
return GlobalScope.async(executor.asCoroutineDispatcher()) {
load(key)
}.asCompletableFuture()
}
}
这将 运行 提供的 Executor
上的 load
函数(默认情况下,ForkJoinPool
),从 Caffeine 的角度来看,这是正确的行为。
但是,我知道我应该尝试 avoid using GlobalScope to launch coroutines。
我考虑让我的 SuspendingCacheLoader
实现 CoroutineScope
并管理它自己的协程上下文。但是 CoroutineScope
旨在由具有托管生命周期的对象来实现。缓存和 AsyncCacheLoader
都没有任何生命周期挂钩。缓存拥有 Executor
和 CompletableFuture
实例,因此它已经通过这种方式控制了加载任务的生命周期。我看不出让任务由协程上下文拥有会增加什么,我担心在停止使用缓存后我无法正确关闭协程上下文。
编写我自己的异步缓存机制会非常困难,所以如果可以的话我想与 Caffeine 实现集成。
使用 GlobalScope
是实施 AsyncCacheLoader
的正确方法,还是有更好的解决方案?
The cache owns the Executor and the CompletableFuture instances, so it already controls the lifecycle of the loading tasks that way.
这不是真的,Caffeine
上的文档指定它使用用户提供的 Executor
或 ForkJoinPool.commonPool()
(如果提供 none)。这意味着没有默认生命周期。
尽管直接调用 GlobalScope
似乎是错误的解决方案,因为没有理由对选择进行硬编码。只需通过构造函数提供 CoroutineScope
并使用 GlobalScope
作为参数,而您没有缓存绑定的显式生命周期。
经过深思熟虑,我想出了一个更简单的解决方案,我认为它更符合地道地使用协程。
该方法通过使用 AsyncCache.get(key, mappingFunction)
而不是实现 AsyncCacheLoader
来工作。但是,它忽略了缓存配置为使用的 Executor
,遵循此处其他一些答案的建议。
class SuspendingCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
suspend fun get(key: K): V = supervisorScope {
getAsync(key).await()
}
private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, _ ->
future {
loadValue(k)
}
}
private suspend fun loadValue(key: K): V = TODO("Load the value")
}
请注意,这取决于 kotlinx-coroutines-jdk8
的 future
协程构建器和 await()
函数。
我认为忽略 Executor
可能是正确的选择。正如@Kiskae 指出的那样,缓存将默认使用 ForkJoinPool
。选择使用它而不是默认的协程调度程序可能没有用。但是,如果我们想使用它会很容易,通过更改 getAsync
函数:
private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, executor ->
future(executor.asCoroutineDispatcher()) {
loadValue(k)
}
}
这是我的解决方案:
定义CoroutineVerticle
的扩展函数
fun <K, V> CoroutineVerticle.buildCache(configurator: Caffeine<Any, Any>.() -> Unit = {}, loader: suspend CoroutineScope.(K) -> V) = Caffeine.newBuilder().apply(configurator).buildAsync { key: K, _ ->
// do not use cache's executor
future {
loader(key)
}
}
在 CoroutineVerticle
内创建我们的缓存
val cache : AsyncLoadingCache<String, String> = buildCache({
maximumSize(10_000)
expireAfterWrite(10, TimeUnit.MINUTES)
}) { key ->
// load data and return it
delay(1000)
"data for key: $key"
}
使用缓存
suspend fun doSomething() {
val data = cache.get('key').await()
val future = cache.get('key2')
val data2 = future.await()
}
这是一个简单的解决方案。将 K、V 符号替换为您的类型。
val cache = Caffeine.newBuilder().buildAsync<K, V> { key: K, _ ->
val future = CompletableFuture<V>()
launch {
val result = someAwaitOperation(key)
future.complete(result)
}
future
}
建议这样的扩展方法
suspend inline fun <K: Any, V: Any> Caffeine<Any, Any>.suspendingLoadingCache(
crossinline suspendedLoader: suspend (key: K) -> V
): AsyncLoadingCache<K, V> =
buildAsync { key, executor: Executor ->
CoroutineScope(executor.asCoroutineDispatcher()).future {
suspendedLoader(key)
}
}
不推荐GlobalScope
,使用CoroutineScope(executor.asCoroutineDispatcher())
future
方法在 kotlinx-coroutines-jdk8
模块中定义
我有一个使用协程的 Kotlin JVM 服务器应用程序,我需要在非阻塞网络调用之前放置一个缓存。我想我可以使用 Caffeine AsyncLoadingCache
to get the non-blocking cache behaviour I need. The AsyncCacheLoader
接口,我需要实现 uses CompletableFuture
。同时,我要调用的加载缓存条目的方法是一个 suspend
函数。
我可以这样弥合差距:
abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
abstract suspend fun load(key: K): V
final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
return GlobalScope.async(executor.asCoroutineDispatcher()) {
load(key)
}.asCompletableFuture()
}
}
这将 运行 提供的 Executor
上的 load
函数(默认情况下,ForkJoinPool
),从 Caffeine 的角度来看,这是正确的行为。
但是,我知道我应该尝试 avoid using GlobalScope to launch coroutines。
我考虑让我的 SuspendingCacheLoader
实现 CoroutineScope
并管理它自己的协程上下文。但是 CoroutineScope
旨在由具有托管生命周期的对象来实现。缓存和 AsyncCacheLoader
都没有任何生命周期挂钩。缓存拥有 Executor
和 CompletableFuture
实例,因此它已经通过这种方式控制了加载任务的生命周期。我看不出让任务由协程上下文拥有会增加什么,我担心在停止使用缓存后我无法正确关闭协程上下文。
编写我自己的异步缓存机制会非常困难,所以如果可以的话我想与 Caffeine 实现集成。
使用 GlobalScope
是实施 AsyncCacheLoader
的正确方法,还是有更好的解决方案?
The cache owns the Executor and the CompletableFuture instances, so it already controls the lifecycle of the loading tasks that way.
这不是真的,Caffeine
上的文档指定它使用用户提供的 Executor
或 ForkJoinPool.commonPool()
(如果提供 none)。这意味着没有默认生命周期。
尽管直接调用 GlobalScope
似乎是错误的解决方案,因为没有理由对选择进行硬编码。只需通过构造函数提供 CoroutineScope
并使用 GlobalScope
作为参数,而您没有缓存绑定的显式生命周期。
经过深思熟虑,我想出了一个更简单的解决方案,我认为它更符合地道地使用协程。
该方法通过使用 AsyncCache.get(key, mappingFunction)
而不是实现 AsyncCacheLoader
来工作。但是,它忽略了缓存配置为使用的 Executor
,遵循此处其他一些答案的建议。
class SuspendingCache<K, V>(private val asyncCache: AsyncCache<K, V>) {
suspend fun get(key: K): V = supervisorScope {
getAsync(key).await()
}
private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, _ ->
future {
loadValue(k)
}
}
private suspend fun loadValue(key: K): V = TODO("Load the value")
}
请注意,这取决于 kotlinx-coroutines-jdk8
的 future
协程构建器和 await()
函数。
我认为忽略 Executor
可能是正确的选择。正如@Kiskae 指出的那样,缓存将默认使用 ForkJoinPool
。选择使用它而不是默认的协程调度程序可能没有用。但是,如果我们想使用它会很容易,通过更改 getAsync
函数:
private fun CoroutineScope.getAsync(key: K) = asyncCache.get(key) { k, executor ->
future(executor.asCoroutineDispatcher()) {
loadValue(k)
}
}
这是我的解决方案:
定义CoroutineVerticle
fun <K, V> CoroutineVerticle.buildCache(configurator: Caffeine<Any, Any>.() -> Unit = {}, loader: suspend CoroutineScope.(K) -> V) = Caffeine.newBuilder().apply(configurator).buildAsync { key: K, _ ->
// do not use cache's executor
future {
loader(key)
}
}
在 CoroutineVerticle
val cache : AsyncLoadingCache<String, String> = buildCache({
maximumSize(10_000)
expireAfterWrite(10, TimeUnit.MINUTES)
}) { key ->
// load data and return it
delay(1000)
"data for key: $key"
}
使用缓存
suspend fun doSomething() {
val data = cache.get('key').await()
val future = cache.get('key2')
val data2 = future.await()
}
这是一个简单的解决方案。将 K、V 符号替换为您的类型。
val cache = Caffeine.newBuilder().buildAsync<K, V> { key: K, _ ->
val future = CompletableFuture<V>()
launch {
val result = someAwaitOperation(key)
future.complete(result)
}
future
}
建议这样的扩展方法
suspend inline fun <K: Any, V: Any> Caffeine<Any, Any>.suspendingLoadingCache(
crossinline suspendedLoader: suspend (key: K) -> V
): AsyncLoadingCache<K, V> =
buildAsync { key, executor: Executor ->
CoroutineScope(executor.asCoroutineDispatcher()).future {
suspendedLoader(key)
}
}
不推荐GlobalScope
,使用CoroutineScope(executor.asCoroutineDispatcher())
future
方法在 kotlinx-coroutines-jdk8
模块中定义