我应该如何使用 Rx + DynamicData 定期检查来自许多在线服务的更新?

How should I use Rx + DynamicData to periodically check for updates from many online services?

我有一个基本的 Calendar/Agenda 应用程序,可以列出一系列帐户和日历中的最新事件。例如,假设我有 3 个帐户:两个不同的 Microsoft 帐户和一个 Google 帐户。我目前将它们存储为服务 (AccountsService) 中名为 AccountsSourceCache<Account, string>

SourceCache<T1,T2> 是 DynamicData 的一部分……它基本上构成了 Reactive Collections。我希望它具有反应性,这样当我添加或删除帐户时,应用程序中的所有内容(设置页面、日历页面等)都会自动更新。

现在,每个帐户可以有多个 Calendar。对于其中的每个 Calendar,我想下载所有即将发布的 CalendarEvent。问题是我需要定期执行此操作以查看是否已添加新事件或是否已更改事件。

这是我目前的做法,但恐怕 Rx 可能真的很糟糕。

var calendarSet = this.accountsService.Accounts.Connect()
    .ObserveOn(RxApp.TaskpoolScheduler)
    .TransformMany(x =>
    {
        ReadOnlyObservableCollection<Models.Calendar> subCalendars;
        x.CalendarService.Calendars.Connect()
            .AutoRefreshOnObservable(calendar => calendar.IsEnabledChanged)
            .AutoRefreshOnObservable(calendar => calendar.IsColorChanged)
            .Filter(calendar=>calendar.IsEnabled)
            .Bind(out subCalendars)
            .Subscribe();
        return subCalendars;
     }, x => x.CacheKey)
     .ObserveOnDispatcher()
     .Publish();

calendarSet
    .Bind(out calendars)
    .Subscribe();


var eventSet = calendarSet
    .ObserveOn(RxApp.TaskpoolScheduler)
    .Transform( calendar =>
    {
        var events = new List<Models.CalendarEvent>();
        Debug.WriteLine(calendar.Name);
        calendar.CalendarService.CalendarEventsObservable(calendar).Subscribe(items =>
        {
            events.AddRange(items);
        });
        return events;
    })
    .TransformMany(x => x, x => x.Key)
    .Filter(x => x.EndDateTime > DateTimeOffset.Now)
    .Sort(new Models.CalendarEventSorter())
    .ObserveOnDispatcher()
    .Bind(out calendarEvents)
    .Subscribe();

calendarSet.Connect();

最重要的是事件也通过订阅一个可观察对象来加载。那就是我放置计时器的地方,它可以让我控制检查在线服务的频率。看起来像这样(20 秒仅用于测试!):

public IObservable<List<Models.CalendarEvent>> CalendarEventsObservable(Models.Calendar calendar)
    {
        var obs = Observable.Interval(TimeSpan.FromSeconds(20)).SelectMany(async x =>
        {
            var items = await GetAllEventsForCalendarAsync(calendar);
            return items;
        });

        return obs;
    }

这似乎行得通!我可以 enable/disable 某些日历,这些事件会在我的绑定列表中出现或消失。我可以看到事件定期更新……我假设因为我使用的 TransformMany 的密钥绑定到 CalenderEvents 在线 ID(固定),所以新下载的事件只是替换了缓存中的旧的。我在 UI.

上没有看到任何闪烁

**更正:它似乎有效是因为我在另一次试验中不小心遗漏了一个 hack。在原始帐户SourceCache 上,我是运行 调用Accounts.Refresh() 的计时器。如果我把它拿出来,什么都不起作用。

这是正确的方法吗?请启发我......我在 Rx 和 DynamicData 上有点挣扎。运算符太多了,不知道怎么办。

谢谢!

我稍微重写了代码,看起来效果好多了。它甚至可以在没有黑客的情况下工作,我在原始 Account SourceCache.

上手动调用 Refresh()

我意识到我一直在滥用 Publish 方法。此外,在 transform 语句中订阅一个 observable 是行不通的,因为它不是一个 DynamicData observable(带有变更集)。相反,我决定做一个 SubscribeMany 来订阅每个 Calendar 中的所有 CalendarEventObservable,并且在订阅的 Action 逻辑中,我将填充 CalendarEvent 的新 SourceCache秒。不会删除任何事件,但由于缓存键,重复事件只会覆盖旧事件,我可以从未选择的日历中过滤掉事件。

 private SourceCache<Models.CalendarEvent, string> calendarEventCache;
 public IObservableCache<Models.CalendarEvent, string> CalendarEventCache => calendarEventCache.Connect().AutoRefreshOnObservable(x=>x.Parent.IsEnabledChanged).Filter(x=>x.Parent.IsEnabled).AsObservableCache();

//////////////////
///IN CONSTRUCTOR:

        calendarEventCache = new SourceCache<Models.CalendarEvent, string>(x => x.Key);

        calendarsCache = this.accountsService.Accounts.Connect()
            .ObserveOn(RxApp.TaskpoolScheduler)
            .TransformMany(x =>
            {
                ReadOnlyObservableCollection<Models.Calendar> subCalendars;
                x.CalendarService.Calendars.Connect()
                            .AutoRefreshOnObservable(calendar => calendar.IsEnabledChanged)
                            .AutoRefreshOnObservable(calendar => calendar.IsColorChanged)
                            .Filter(calendar => calendar.IsEnabled)
                            .Bind(out subCalendars)
                            .Subscribe();
                return subCalendars;
            }, x => x.CacheKey)
            .ObserveOnDispatcher()
            .AsObservableCache();

        calendarsCache.Connect()
            .ObserveOn(RxApp.TaskpoolScheduler)
            .SubscribeMany(calendar =>
            {
                return calendar.CalendarService.CalendarEventsObservable(calendar).Subscribe(calendarEvent =>
               {
                   calendarEventCache.AddOrUpdate(calendarEvent);
               });
            })
            .Subscribe();

现在,在 UI 的视图模型中,我订阅了 SourceCache<CalendarEvent,string>

您似乎已经回答了您自己的问题,并且代码看起来比原始代码更优化。但是我建议可以进行一些优化:

  1. 使用 AutoRefreshOnObservable,这将允许您创建单个可观察对象,而不是使用两次 AutoRefresh。

  2. 我无法从代码中判断 CalendarEventsObservable returns 是单个值还是集合。如果它 returns 是一个集合,则可以使用 EditDiff 而不是使用 AddOrUpdate。 EditDiff 期望与指定的比较相等,并防止触发不必要的通知。