RxJava 缓存流中网络调用 ID 的结果(Redis 或类似的缓存解决方案)
RxJava Caching Results of Network Call IDs from a Stream (Redis or similar caching solution)
我需要一些关于 RxJava 的帮助。我有一个昂贵的网络调用,其中 returns 一个 Observable(来自 elasticsearch 的广告流)。我想将每个发出的项目 (Advert) 的 ID 属性 缓存 10 分钟(在 Redis 中),以便接下来的 10 分钟内的后续调用使用 Cache 中的 ID 从 Elasticsearch 中获取广告。
我有一些代码 - 这在某种程度上实现了预期的结果,(归功于以下博客 ..
http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/)
它可以缓存流中的每个发出的项目,我需要的是将流中这些项目的所有 ID 缓存为 1 个缓存条目
到目前为止的代码在这里 https://github.com/tonymurphy/rxjava 对于任何感兴趣的人,下面的代码片段
@Component
public class CachingObservable {
private final Logger logger = LoggerFactory.getLogger(CachingObservable.class);
@Autowired
private AdvertService advertService;
// Each "network" response is different
private Long requestNumber = 0L;
public Observable<Advert> getAdverts(final String location) {
Observable<Advert> memory = memory(location);
Observable<Advert> network = network(location);
Observable<Advert> networkWithSave = network.doOnNext(new Action1<Advert>() {
@Override
public void call(Advert advert) {
List<Long> ids = new ArrayList<Long>();
ids.add(advert.getId());
advertService.cache(location, ids);
}
});
// Retrieve the first source with data - concat checks in order
Observable<Advert> source = Observable.concat(memory,
networkWithSave)
.first();
return source;
}
根据我的理解,concat 方法对我的用例并不是很有用。我需要知道 if/when 网络可观察完成,我需要获取返回的广告 ID 列表,我需要将它们存储在缓存中。我可以订阅网络 observable - 但我希望它是惰性的 - 只有在缓存中找不到数据时才会调用。所以以下更新的代码不起作用..任何想法表示赞赏
public Observable<Advert> getAdverts(final String location) {
Observable<Advert> memory = memory(location);
Observable<Advert> network = network(location);
Observable<Advert> networkWithSave = network.doOnNext(new Action1<Advert>() {
@Override
public void call(Advert advert) {
List<Long> ids = new ArrayList<Long>();
ids.add(advert.getId());
advertService.cache(location, ids);
}
});
// Retrieve the first source with data - concat checks in order
Observable<Advert> source = Observable.concat(memory,
networkWithSave)
.first();
Observable<List<Advert>> listObservable = networkWithSave.toList();
final Func1<List<Advert>, List<Long>> transformer = new Func1<List<Advert>, List<Long>>() {
@Override
public List<Long> call(List<Advert> adverts) {
List<Long> ids = new ArrayList<Long>();
for (Advert advert : adverts) {
ids.add(advert.getId());
}
return ids;
}
};
listObservable.map(transformer).subscribe(new Action1<List<Long>>() {
@Override
public void call(List<Long> ids) {
logger.info("ids {}", ids);
}
});
return source;
}
我会做的是使用过滤器来确保不会发出缓存的旧内容,以便 concat 跳转到网络调用:
Subject<Pair<Long, List<Advert>>, Pair<Long, List<Advert>>> cache =
BehaviorSubject.create().toSerialized();
static final long RETENTION_TIME = 10L * 60 * 1000;
Observable<Advert> memory = cache.filter(v ->
v.first + RETENTION_TIME > System.currentTimeMillis()).flatMapIterable(v -> v);
Observable<Advert> network = ...
Observable<Advert> networkWithSave = network.toList().doOnNext(v ->
cache.onNext(Pair.of(System.currentTimeMillis(), v)).flatMapIterable(v -> v)
);
return memory.switchIfEmpty(network);
好的,我想我有一个适合我的解决方案。我可能忽略了一些东西,但这应该很容易?
public Observable<Advert> getAdverts(final String location) {
Observable<Advert> memory = memory(location);
final Observable<Advert> network = network(location);
final Func1<List<Advert>, List<Long>> advertToIdTransformer = convertAdvertsToIds();
memory.isEmpty().subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
if (aBoolean.equals(Boolean.TRUE)) {
Observable<List<Long>> listObservable = network.toList().map(advertToIdTransformer);
listObservable.subscribe(new Action1<List<Long>>() {
@Override
public void call(List<Long> ids) {
logger.info("Caching ids {}", ids);
advertService.cache(location, ids);
}
});
}
}
});
// Retrieve the first source with data - concat checks in order
Observable<Advert> source = Observable.concat(memory,
network)
.first();
return source;
}
我需要一些关于 RxJava 的帮助。我有一个昂贵的网络调用,其中 returns 一个 Observable(来自 elasticsearch 的广告流)。我想将每个发出的项目 (Advert) 的 ID 属性 缓存 10 分钟(在 Redis 中),以便接下来的 10 分钟内的后续调用使用 Cache 中的 ID 从 Elasticsearch 中获取广告。
我有一些代码 - 这在某种程度上实现了预期的结果,(归功于以下博客 .. http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/)
它可以缓存流中的每个发出的项目,我需要的是将流中这些项目的所有 ID 缓存为 1 个缓存条目
到目前为止的代码在这里 https://github.com/tonymurphy/rxjava 对于任何感兴趣的人,下面的代码片段
@Component
public class CachingObservable {
private final Logger logger = LoggerFactory.getLogger(CachingObservable.class);
@Autowired
private AdvertService advertService;
// Each "network" response is different
private Long requestNumber = 0L;
public Observable<Advert> getAdverts(final String location) {
Observable<Advert> memory = memory(location);
Observable<Advert> network = network(location);
Observable<Advert> networkWithSave = network.doOnNext(new Action1<Advert>() {
@Override
public void call(Advert advert) {
List<Long> ids = new ArrayList<Long>();
ids.add(advert.getId());
advertService.cache(location, ids);
}
});
// Retrieve the first source with data - concat checks in order
Observable<Advert> source = Observable.concat(memory,
networkWithSave)
.first();
return source;
}
根据我的理解,concat 方法对我的用例并不是很有用。我需要知道 if/when 网络可观察完成,我需要获取返回的广告 ID 列表,我需要将它们存储在缓存中。我可以订阅网络 observable - 但我希望它是惰性的 - 只有在缓存中找不到数据时才会调用。所以以下更新的代码不起作用..任何想法表示赞赏
public Observable<Advert> getAdverts(final String location) {
Observable<Advert> memory = memory(location);
Observable<Advert> network = network(location);
Observable<Advert> networkWithSave = network.doOnNext(new Action1<Advert>() {
@Override
public void call(Advert advert) {
List<Long> ids = new ArrayList<Long>();
ids.add(advert.getId());
advertService.cache(location, ids);
}
});
// Retrieve the first source with data - concat checks in order
Observable<Advert> source = Observable.concat(memory,
networkWithSave)
.first();
Observable<List<Advert>> listObservable = networkWithSave.toList();
final Func1<List<Advert>, List<Long>> transformer = new Func1<List<Advert>, List<Long>>() {
@Override
public List<Long> call(List<Advert> adverts) {
List<Long> ids = new ArrayList<Long>();
for (Advert advert : adverts) {
ids.add(advert.getId());
}
return ids;
}
};
listObservable.map(transformer).subscribe(new Action1<List<Long>>() {
@Override
public void call(List<Long> ids) {
logger.info("ids {}", ids);
}
});
return source;
}
我会做的是使用过滤器来确保不会发出缓存的旧内容,以便 concat 跳转到网络调用:
Subject<Pair<Long, List<Advert>>, Pair<Long, List<Advert>>> cache =
BehaviorSubject.create().toSerialized();
static final long RETENTION_TIME = 10L * 60 * 1000;
Observable<Advert> memory = cache.filter(v ->
v.first + RETENTION_TIME > System.currentTimeMillis()).flatMapIterable(v -> v);
Observable<Advert> network = ...
Observable<Advert> networkWithSave = network.toList().doOnNext(v ->
cache.onNext(Pair.of(System.currentTimeMillis(), v)).flatMapIterable(v -> v)
);
return memory.switchIfEmpty(network);
好的,我想我有一个适合我的解决方案。我可能忽略了一些东西,但这应该很容易?
public Observable<Advert> getAdverts(final String location) {
Observable<Advert> memory = memory(location);
final Observable<Advert> network = network(location);
final Func1<List<Advert>, List<Long>> advertToIdTransformer = convertAdvertsToIds();
memory.isEmpty().subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
if (aBoolean.equals(Boolean.TRUE)) {
Observable<List<Long>> listObservable = network.toList().map(advertToIdTransformer);
listObservable.subscribe(new Action1<List<Long>>() {
@Override
public void call(List<Long> ids) {
logger.info("Caching ids {}", ids);
advertService.cache(location, ids);
}
});
}
}
});
// Retrieve the first source with data - concat checks in order
Observable<Advert> source = Observable.concat(memory,
network)
.first();
return source;
}