RxJava 缓存流中网络调用 ID 的结果(Redis 或类似的缓存解决方案)

RxJava Caching Results of Network Call IDs from a Stream (Redis or similar caching solution)

我需要一些关于 RxJava 的帮助。我有一个昂贵的网络调用,其中 returns 一个 Observable(来自 elasticsearch 的广告流)。我想将每个发出的项目 (Advert) 的 ID 属性 缓存 10 分钟(在 Redis 中),以便接下来的 10 分钟内的后续调用使用 Cache 中的 ID 从 Elasticsearch 中获取广告。

我有一些代码 - 这在某种程度上实现了预期的结果,(归功于以下博客 .. http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/)

它可以缓存流中的每个发出的项目,我需要的是将流中这些项目的所有 ID 缓存为 1 个缓存条目

到目前为止的代码在这里 https://github.com/tonymurphy/rxjava 对于任何感兴趣的人,下面的代码片段

@Component
public class CachingObservable {

    private final Logger logger = LoggerFactory.getLogger(CachingObservable.class);

    @Autowired
    private AdvertService advertService;

    // Each "network" response is different
    private Long requestNumber = 0L;

    public Observable<Advert> getAdverts(final String location) {

        Observable<Advert> memory = memory(location);
        Observable<Advert> network = network(location);

        Observable<Advert> networkWithSave = network.doOnNext(new Action1<Advert>() {
            @Override
            public void call(Advert advert) {
                List<Long> ids = new ArrayList<Long>();
                ids.add(advert.getId());
                advertService.cache(location, ids);
            }
        });

        // Retrieve the first source with data - concat checks in order
        Observable<Advert> source = Observable.concat(memory,
                networkWithSave)
                .first();

        return source;
    }

根据我的理解,concat 方法对我的用例并不是很有用。我需要知道 if/when 网络可观察完成,我需要获取返回的广告 ID 列表,我需要将它们存储在缓存中。我可以订阅网络 observable - 但我希望它是惰性的 - 只有在缓存中找不到数据时才会调用。所以以下更新的代码不起作用..任何想法表示赞赏

public Observable<Advert> getAdverts(final String location) {

    Observable<Advert> memory = memory(location);
    Observable<Advert> network = network(location);

    Observable<Advert> networkWithSave = network.doOnNext(new Action1<Advert>() {
        @Override
        public void call(Advert advert) {
            List<Long> ids = new ArrayList<Long>();
            ids.add(advert.getId());
            advertService.cache(location, ids);
        }
    });

    // Retrieve the first source with data - concat checks in order
    Observable<Advert> source = Observable.concat(memory,
            networkWithSave)
            .first();

    Observable<List<Advert>> listObservable = networkWithSave.toList();
    final Func1<List<Advert>, List<Long>> transformer = new Func1<List<Advert>, List<Long>>() {
        @Override
        public List<Long> call(List<Advert> adverts) {
            List<Long> ids = new ArrayList<Long>();
            for (Advert advert : adverts) {
                ids.add(advert.getId());
            }
            return ids;
        }
    };

    listObservable.map(transformer).subscribe(new Action1<List<Long>>() {
        @Override
        public void call(List<Long> ids) {
            logger.info("ids {}", ids);
        }
    });

    return source;
}

我会做的是使用过滤器来确保不会发出缓存的旧内容,以便 concat 跳转到网络调用:

Subject<Pair<Long, List<Advert>>, Pair<Long, List<Advert>>> cache = 
    BehaviorSubject.create().toSerialized();
static final long RETENTION_TIME = 10L * 60 * 1000;

Observable<Advert> memory = cache.filter(v -> 
    v.first + RETENTION_TIME > System.currentTimeMillis()).flatMapIterable(v -> v);

Observable<Advert> network = ...

Observable<Advert> networkWithSave = network.toList().doOnNext(v -> 
    cache.onNext(Pair.of(System.currentTimeMillis(), v)).flatMapIterable(v -> v)
);

return memory.switchIfEmpty(network);

好的,我想我有一个适合我的解决方案。我可能忽略了一些东西,但这应该很容易?

public Observable<Advert> getAdverts(final String location) {

    Observable<Advert> memory = memory(location);
    final Observable<Advert> network = network(location);

    final Func1<List<Advert>, List<Long>> advertToIdTransformer = convertAdvertsToIds();

    memory.isEmpty().subscribe(new Action1<Boolean>() {
        @Override
        public void call(Boolean aBoolean) {
            if (aBoolean.equals(Boolean.TRUE)) {
                Observable<List<Long>> listObservable = network.toList().map(advertToIdTransformer);
                listObservable.subscribe(new Action1<List<Long>>() {
                    @Override
                    public void call(List<Long> ids) {
                        logger.info("Caching ids {}", ids);
                        advertService.cache(location, ids);
                    }
                });
            }
        }
    });

    // Retrieve the first source with data - concat checks in order
    Observable<Advert> source = Observable.concat(memory,
            network)
            .first();


    return source;
}