Reactive Extension 中的数据服务

Data service in Reactive Extension

我想要一个通用的 class 用于数据处理创建、更新和删除的内存缓存。底层模型继承自具有字符串类型 Id 的接口。

interface IModel
{
  string Id { get; }
}

处理创建和更新很容易。例如,如果我想订阅流并填充字典,我知道如果模型 ID 不存在则需要创建,否则就是更新。

我的问题是:
在不引入另一个 class 来包装我的模型 的情况下,您将如何处理删除 ?我想保留 IObservable<TModel>,而不是 IObservable<Event<TModel>>IObservable<Pair<string, TModel>> 之类的东西,但我不知道该怎么做。这可能吗?

interface IDataService<TModel>
{
  IObservable<TModel> DataStream { get; }
}

正如@Enigmativity 建议的那样,您可以使用嵌套的可观察序列来解决这个问题。 IntroToRx 的 Sequences of coincidence 部分对此进行了介绍。

这将如何运作?

您可以将嵌套序列想象成二维数组,或者更具体地说是 jagged array。 外部序列是内部序列的容器。 内部序列的到来表示创建模型。

interface IDataService<TModel>
{
    IObservable<IObservable<TModel>> DataStream { get; }
}

一旦你有了一个内部序列,它产生的所有值都是更新(第一个除外)。 内部序列只会为单个 id 生成更新。 当内部序列完成时,表示 删除 .

如上文 link 开头段落所述,此模式适用于各种用例。

作为弹珠图,您将得到如下所示的内容。 每行代表一个内部序列。

m1  1---2----3--|
m2     a----|
m3       x----y---z--

这将导致以下逻辑流程:

  1. 创建状态为“1”的 m1
  2. 创建状态为'a'
  3. 的m2
  4. 用值“2”更新 m1
  5. 创建 m3 值 'x'
  6. 删除 m2
  7. 用值“3”更新 m1
  8. 用值 'y'
  9. 更新 m3
  10. 删除 m1
  11. 用值 'z'
  12. 更新 m3