响应式 ("Rx") 缓存和更新数据的方式
Reactive ("Rx") way to cache and update data
我有一个从数据库读取数据并通过 IObservable 发布该数据的例程。
数据发布后,我想更新所有刚刚发布的行以阻止它们再次发布。
我不确定 "reactive" 这样做的方法。 (每次我重新访问 Rx 时,我似乎都会遇到这个问题!)
我想我需要做两件事
1) 缓存发布的数据,因为它会有我需要随后更新的 ID - 我想知道是否使用主题来缓存正在发布的数据,或者是否有一些其他的例程来包装我目前拥有的,订阅它,缓存它然后重新发布
2) 发布后更新数据。我真的不确定如何将其构建到管道中!
我根据我在网上找到的各种东西构建了这个(特别是 Lee Campbell 的数据库轮询 - 谢谢 Lee!)但是我添加的其他部分是我的,我可能有大错特错。如果某些部分可以更好地实现非反应性,我愿意接受建议。例如,我已经使数据库更新例程可观察,但我不知道它是否真的有必要 - 或者如果以这种方式实现它是否更容易将它包含在管道中..
这是相关的代码位...
private IObservable<INotification> Poller() =>
Observable
.Timer(_pollingPeriod, _scheduler)
.SelectMany(_ => NewNotifications(_cx))
.Timeout(_pollingPeriod + _queryTimeout, Observable.Return(TimeOut.Notification()), _scheduler)
.Catch<INotification, Exception>(err => Observable.Return(Error.Notification(err)))
.Repeat();
private IObservable<INotification> NewNotifications(string cx)
{
try
{
return SqlRead<INotification>(cx, NewNotificationsSql(),sdr => EventBuilder(sdr), Empty.Notification());
}
catch (Exception ex)
{
throw ex;
}
}
internal static IObservable<T> SqlRead<T>(string cx, string sql, Func<SqlDataReader, T> mapper, T noRows) =>
Observable.Create<T>(o =>
{
using (var conn = new SqlConnection(cx))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
using (var rdr = cmd.ExecuteReader())
{
if (!rdr.HasRows)
{
o.OnNext(noRows);
}
else
{
while (rdr.Read())
{
o.OnNext(mapper(rdr));
}
}
}
}
}
o.OnCompleted();
return Disposable.Empty;
});
internal static IObservable<int> SqlWrite(string cx, string sql) =>
Observable.Create<int>(o =>
{
using (var conn = new SqlConnection(cx))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
o.OnNext(cmd.ExecuteNonQuery());
}
}
o.OnCompleted();
return Disposable.Empty;
});
假设您有一个 UI 和当前通知列表
public class NotificationListViewModel
{
ObservableCollection<INotification> Items {get;}
}
要维护此集合,您需要了解通知的变化方式。
让我们有一个 class 来显示变化
enum ChangeType
{
Add,
Remove,
Update
}
class Change<T>
{
ChangeType Type {get;}
T Value {get;}
}
现在您可以通过更改公开通知
INotificationProvider
{
public IObservable<Change<INotification>> Notifications {get;}
}
为了方便起见,让我们在 INotification
中添加方法 Update
public class NotificationListViewModel
{
public NotificationListViewModel(INotificationProvider provider)
{
provider.Notifications.Subscribe(change =>
{
if(change.Type == ChangeType.Add)
{
Items.Add(change.Value);
}
if(change.Type == ChangeType.Update)
{
Items.First(x => x.Id = change.Value.Id).Update(change.Value);
}
if(change.Type == ChangeType.Remove)
{
Items.Remove(change.Value);
}
});
}
}
要了解您收到的更改类型,您需要维护从数据库中读取的现有通知列表。
我写的所有这些代码都是为了展示 "how can you think about this things" 的例子。
有一个很棒的库 DynamicData,所有这些想法都以方便和优化的方式实现。
我有一个从数据库读取数据并通过 IObservable 发布该数据的例程。
数据发布后,我想更新所有刚刚发布的行以阻止它们再次发布。
我不确定 "reactive" 这样做的方法。 (每次我重新访问 Rx 时,我似乎都会遇到这个问题!)
我想我需要做两件事
1) 缓存发布的数据,因为它会有我需要随后更新的 ID - 我想知道是否使用主题来缓存正在发布的数据,或者是否有一些其他的例程来包装我目前拥有的,订阅它,缓存它然后重新发布
2) 发布后更新数据。我真的不确定如何将其构建到管道中!
我根据我在网上找到的各种东西构建了这个(特别是 Lee Campbell 的数据库轮询 - 谢谢 Lee!)但是我添加的其他部分是我的,我可能有大错特错。如果某些部分可以更好地实现非反应性,我愿意接受建议。例如,我已经使数据库更新例程可观察,但我不知道它是否真的有必要 - 或者如果以这种方式实现它是否更容易将它包含在管道中..
这是相关的代码位...
private IObservable<INotification> Poller() =>
Observable
.Timer(_pollingPeriod, _scheduler)
.SelectMany(_ => NewNotifications(_cx))
.Timeout(_pollingPeriod + _queryTimeout, Observable.Return(TimeOut.Notification()), _scheduler)
.Catch<INotification, Exception>(err => Observable.Return(Error.Notification(err)))
.Repeat();
private IObservable<INotification> NewNotifications(string cx)
{
try
{
return SqlRead<INotification>(cx, NewNotificationsSql(),sdr => EventBuilder(sdr), Empty.Notification());
}
catch (Exception ex)
{
throw ex;
}
}
internal static IObservable<T> SqlRead<T>(string cx, string sql, Func<SqlDataReader, T> mapper, T noRows) =>
Observable.Create<T>(o =>
{
using (var conn = new SqlConnection(cx))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
using (var rdr = cmd.ExecuteReader())
{
if (!rdr.HasRows)
{
o.OnNext(noRows);
}
else
{
while (rdr.Read())
{
o.OnNext(mapper(rdr));
}
}
}
}
}
o.OnCompleted();
return Disposable.Empty;
});
internal static IObservable<int> SqlWrite(string cx, string sql) =>
Observable.Create<int>(o =>
{
using (var conn = new SqlConnection(cx))
{
conn.Open();
using (var cmd = new SqlCommand(sql, conn))
{
o.OnNext(cmd.ExecuteNonQuery());
}
}
o.OnCompleted();
return Disposable.Empty;
});
假设您有一个 UI 和当前通知列表
public class NotificationListViewModel
{
ObservableCollection<INotification> Items {get;}
}
要维护此集合,您需要了解通知的变化方式。 让我们有一个 class 来显示变化
enum ChangeType
{
Add,
Remove,
Update
}
class Change<T>
{
ChangeType Type {get;}
T Value {get;}
}
现在您可以通过更改公开通知
INotificationProvider
{
public IObservable<Change<INotification>> Notifications {get;}
}
为了方便起见,让我们在 INotification
中添加方法 Update
public class NotificationListViewModel
{
public NotificationListViewModel(INotificationProvider provider)
{
provider.Notifications.Subscribe(change =>
{
if(change.Type == ChangeType.Add)
{
Items.Add(change.Value);
}
if(change.Type == ChangeType.Update)
{
Items.First(x => x.Id = change.Value.Id).Update(change.Value);
}
if(change.Type == ChangeType.Remove)
{
Items.Remove(change.Value);
}
});
}
}
要了解您收到的更改类型,您需要维护从数据库中读取的现有通知列表。
我写的所有这些代码都是为了展示 "how can you think about this things" 的例子。 有一个很棒的库 DynamicData,所有这些想法都以方便和优化的方式实现。