响应式 ("Rx") 缓存和更新数据的方式

Reactive ("Rx") way to cache and update data

我有一个从数据库读取数据并通过 IObservable 发布该数据的例程。

数据发布后,我想更新所有刚刚发布的行以阻止它们再次发布。

我不确定 "reactive" 这样做的方法。 (每次我重新访问 Rx 时,我似乎都会遇到这个问题!)

我想我需要做两件事

1) 缓存发布的数据,因为它会有我需要随后更新的 ID - 我想知道是否使用主题来缓存正在发布的数据,或者是否有一些其他的例程来包装我目前拥有的,订阅它,缓存它然后重新发布

2) 发布后更新数据。我真的不确定如何将其构建到管道中!

我根据我在网上找到的各种东西构建了这个(特别是 Lee Campbell 的数据库轮询 - 谢谢 Lee!)但是我添加的其他部分是我的,我可能有大错特错。如果某些部分可以更好地实现非反应性,我愿意接受建议。例如,我已经使数据库更新例程可观察,但我不知道它是否真的有必要 - 或者如果以这种方式实现它是否更容易将它包含在管道中..

这是相关的代码位...

private IObservable<INotification> Poller() =>
    Observable
        .Timer(_pollingPeriod, _scheduler)
        .SelectMany(_ => NewNotifications(_cx))                
        .Timeout(_pollingPeriod + _queryTimeout, Observable.Return(TimeOut.Notification()), _scheduler) 
        .Catch<INotification, Exception>(err => Observable.Return(Error.Notification(err))) 
        .Repeat();  

private IObservable<INotification> NewNotifications(string cx)
{
    try
    {                
        return SqlRead<INotification>(cx, NewNotificationsSql(),sdr => EventBuilder(sdr), Empty.Notification());
    }
    catch (Exception ex)
    {
        throw ex;
    }
}

internal static IObservable<T> SqlRead<T>(string cx, string sql, Func<SqlDataReader, T> mapper, T noRows) =>
    Observable.Create<T>(o =>
    {                
        using (var conn = new SqlConnection(cx))
        {
            conn.Open();
            using (var cmd = new SqlCommand(sql, conn))
            {                        
                using (var rdr = cmd.ExecuteReader())
                {
                    if (!rdr.HasRows)
                    {
                        o.OnNext(noRows);
                    }
                    else
                    {
                        while (rdr.Read())
                        {             
                            o.OnNext(mapper(rdr));                                    
                        }
                    }
                }
            }
        }
        o.OnCompleted();                
        return Disposable.Empty;
    });

internal static IObservable<int> SqlWrite(string cx, string sql) =>
    Observable.Create<int>(o =>
    {
        using (var conn = new SqlConnection(cx))
        {
            conn.Open();
            using (var cmd = new SqlCommand(sql, conn))
            {
                o.OnNext(cmd.ExecuteNonQuery());
            }                    
        }
        o.OnCompleted();
        return Disposable.Empty;
    });

假设您有一个 UI 和当前通知列表

public class NotificationListViewModel
{
    ObservableCollection<INotification> Items {get;}
}

要维护此集合,您需要了解通知的变化方式。 让我们有一个 class 来显示变化

enum ChangeType
{
   Add,
   Remove,
   Update
}

class Change<T>
{
    ChangeType Type {get;}
    T Value {get;}
}

现在您可以通过更改公开通知

INotificationProvider
{
   public IObservable<Change<INotification>> Notifications {get;}
}

为了方便起见,让我们在 INotification 中添加方法 Update

public class NotificationListViewModel
{
    public NotificationListViewModel(INotificationProvider provider)
    {
        provider.Notifications.Subscribe(change => 
        {
          if(change.Type == ChangeType.Add)
          {
            Items.Add(change.Value);
          }
          if(change.Type == ChangeType.Update)
          {
            Items.First(x => x.Id = change.Value.Id).Update(change.Value);
          }
          if(change.Type == ChangeType.Remove)
          {
            Items.Remove(change.Value);
          }
        });
    }
}

要了解您收到的更改类型,您需要维护从数据库中读取的现有通知列表。

我写的所有这些代码都是为了展示 "how can you think about this things" 的例子。 有一个很棒的库 DynamicData,所有这些想法都以方便和优化的方式实现。