避免在可观察管道中多次调用
Avoiding multiple calls in observable pipeline
我正在尝试创建一个 GetAndFetch
方法,该方法首先从缓存中 return 数据,然后从网络服务中获取和 return 数据,最后更新缓存。
akavache
中已经存在这样的函数,但是,它检索或存储的数据就像一个blob。也就是说,如果我对 rss
Feed 感兴趣,我只能在整个 Feed 而不是单个项目的级别上工作。我有兴趣创建一个 return 将项目作为 IObservable<Item>
的版本。这样做的好处是新的 Item
可以在 service
编辑 return 时立即显示,而不是等待所有 Items
。
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
// Then call the service
IObservable<Item> fetchObs = service.GetItems(feedUrl);
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs = fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
{
return cache.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>());
});
// Then make sure cache retrieval, fetching and update is done in order
return cacheBlobObject.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(upadteObs);
}
我的方法的问题是 Concat(upadteObs)
重新订阅 fetchObs
并最终再次调用 service.GetItems(feedUrl)
,这很浪费。
听起来你需要 .Publish(share => { ... })
过载。
试试这个:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
return
service
.GetItems(feedUrl)
.Publish(fetchObs =>
{
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs =
fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
cache
.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>()));
// Then make sure cache retrieval, fetching and update is done in order
return
cacheBlobObject
.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(updateObs);
});
}
我担心 Concat
电话 - 他们可能需要 Merge
。
此外,您对 service.GetItems
的调用似乎无论如何都会获取所有项目 - 它如何避免缓存中已经存在的项目?
基于评论的替代实现:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
return
(
from hs in cache.GetObject<HashSet<Item>>(feedUrl)
let ids = new HashSet<string>(hs.Select(x => x.Id))
select
hs
.ToObservable()
.Merge(
service
.GetItems(feedUrl)
.Where(x => !ids.Contains(x.Id))
.Do(x => cache.InsertObject(feedUrl, new [] { x })))
).Merge();
}
我正在尝试创建一个 GetAndFetch
方法,该方法首先从缓存中 return 数据,然后从网络服务中获取和 return 数据,最后更新缓存。
akavache
中已经存在这样的函数,但是,它检索或存储的数据就像一个blob。也就是说,如果我对 rss
Feed 感兴趣,我只能在整个 Feed 而不是单个项目的级别上工作。我有兴趣创建一个 return 将项目作为 IObservable<Item>
的版本。这样做的好处是新的 Item
可以在 service
编辑 return 时立即显示,而不是等待所有 Items
。
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
// Then call the service
IObservable<Item> fetchObs = service.GetItems(feedUrl);
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs = fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
{
return cache.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>());
});
// Then make sure cache retrieval, fetching and update is done in order
return cacheBlobObject.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(upadteObs);
}
我的方法的问题是 Concat(upadteObs)
重新订阅 fetchObs
并最终再次调用 service.GetItems(feedUrl)
,这很浪费。
听起来你需要 .Publish(share => { ... })
过载。
试试这个:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
return
service
.GetItems(feedUrl)
.Publish(fetchObs =>
{
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs =
fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
cache
.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>()));
// Then make sure cache retrieval, fetching and update is done in order
return
cacheBlobObject
.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(updateObs);
});
}
我担心 Concat
电话 - 他们可能需要 Merge
。
此外,您对 service.GetItems
的调用似乎无论如何都会获取所有项目 - 它如何避免缓存中已经存在的项目?
基于评论的替代实现:
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
return
(
from hs in cache.GetObject<HashSet<Item>>(feedUrl)
let ids = new HashSet<string>(hs.Select(x => x.Id))
select
hs
.ToObservable()
.Merge(
service
.GetItems(feedUrl)
.Where(x => !ids.Contains(x.Id))
.Do(x => cache.InsertObject(feedUrl, new [] { x })))
).Merge();
}