如何在 RxJava 中缓存项目并避免缓存踩踏?
How can I cache items in RxJava and avoid cache stampede?
假设我有以下代码:
Entity getEntity(GUID entityId) {
Entity entity = entityLRUCache.get(entityId);
if (entity == null) {
entity = longLoadFromDatabase(entityId);
entityLRUCache.put(entityId, entity);
}
return entity;
}
我可以用Caffeine框架来解决缓存踩踏:
Entity getEntity(GUID entityId) {
return entityCache.get(entityId, this::longLoadFromDatabase);
}
但是现在方法 longLoadFromDatabase
returns 一个 Single<Entity>
而不是实体本身(get
的第二个参数是来自 int -> Entity
的映射器),所以以前的解决方案将不再有效。
您可以使用 SingleSubject
作为并发映射中的占位符:
ConcurrentMap<GUID, SingleSubject<Entity>> map = ...
public Single<Entity> getEntity(GUID guid) {
SingleSubject<Entity> e = map.get(guid);
if (e == null) {
e = SingleSubject.create();
SingleSubject<Entity> f = map.putIfAbsent(guid, e);
if (f == null) {
longLoadFromDatabase(guid).subscribe(e);
} else {
e = f;
}
}
return e;
}
将@akarnokd 的建议与 Caffeine 的异步支持相结合,您可以将 AsyncLoadingCache 与 Rx 的转换器一起使用。
AsyncLoadingCache<Integer, Entity> cache = Caffeine.newBuilder()
.buildAsync((key, executor) -> SingleInterop.get()
.apply(longLoadFromDatabase(key)).toCompletableFuture());
...
return SingleInterop.fromFuture(cache.get(123));
如果 future 因异常而失败或解析为空值,缓存将自动删除该条目。
假设我有以下代码:
Entity getEntity(GUID entityId) {
Entity entity = entityLRUCache.get(entityId);
if (entity == null) {
entity = longLoadFromDatabase(entityId);
entityLRUCache.put(entityId, entity);
}
return entity;
}
Entity getEntity(GUID entityId) {
return entityCache.get(entityId, this::longLoadFromDatabase);
}
但是现在方法 longLoadFromDatabase
returns 一个 Single<Entity>
而不是实体本身(get
的第二个参数是来自 int -> Entity
的映射器),所以以前的解决方案将不再有效。
您可以使用 SingleSubject
作为并发映射中的占位符:
ConcurrentMap<GUID, SingleSubject<Entity>> map = ...
public Single<Entity> getEntity(GUID guid) {
SingleSubject<Entity> e = map.get(guid);
if (e == null) {
e = SingleSubject.create();
SingleSubject<Entity> f = map.putIfAbsent(guid, e);
if (f == null) {
longLoadFromDatabase(guid).subscribe(e);
} else {
e = f;
}
}
return e;
}
将@akarnokd 的建议与 Caffeine 的异步支持相结合,您可以将 AsyncLoadingCache 与 Rx 的转换器一起使用。
AsyncLoadingCache<Integer, Entity> cache = Caffeine.newBuilder()
.buildAsync((key, executor) -> SingleInterop.get()
.apply(longLoadFromDatabase(key)).toCompletableFuture());
...
return SingleInterop.fromFuture(cache.get(123));
如果 future 因异常而失败或解析为空值,缓存将自动删除该条目。