使用 Observable SelectMany 处理 Reactive Extensions 错误

Reactive Extensions error handling with Observable SelectMany

我正在尝试使用响应式扩展库在特定文件夹上编写文件观察器 这个想法是监视硬盘驱动器文件夹中的新文件,等待文件完全写入并将事件推送给订阅者。我不想使用 FileSystemWatcher 因为它会为同一个文件引发两次 Changed 事件。

所以我把它写在 "reactive way"(我希望)中,如下所示:

var provider = new MessageProviderFake();
var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => provider.GetFiles());
using (source.Subscribe(_ => Console.WriteLine(_.Name), () => Console.WriteLine("completed to Console")))
{
    Console.WriteLine("press Enter to stop");
    Console.ReadLine();
}

但是我找不到 "reactive way" 来处理错误。例如,文件目录可能位于外部驱动器上,但由于连接问题而变得不可用。 所以我添加了 GetFilesSafe 来处理来自 Reactive Extensions 的异常错误:

static IEnumerable<MessageArg> GetFilesSafe(IMessageProvider provider)
{
    try
    {
        return provider.GetFiles();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
        return new MessageArg[0];
    }
}

并像

一样使用它
var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => GetFilesSafe(provider));

有没有更好的方法让 SelectMany 调用 provider.GetFiles() 即使在引发异常时也是如此?在这种情况下,我使用错误计数器重复读取操作 N 次,然后失败(终止进程)。

Reactive Extensions中有"try N time and wait Q seconds between attempts"吗?

GetFilesSafe 也有一个问题:它 returns IEnumerable<MessageArg> 用于懒惰阅读但是它可以在迭代时引发并且异常将在 SelectMany 的某处抛出

有一个 Retry 扩展,它只是在当前一个错误时再次订阅可观察对象,但听起来它不能提供您想要的灵活性。

您可以使用 Catch 构建一些东西,如果外部发生错误,它会订阅您提供给它的可观察对象。类似于以下内容(未经测试):

IObservable<Thing> GetFilesObs(int times, bool delay) {
    return Observable
        .Return(0)
        .Delay(TimeSpan.FromSeconds(delay ? <delay_time> : 0))
        .SelectMany(_ => Observable.Defer(() => GetFilesErroringObservable()))
        .Catch(Observable.Defer(() => GetFilesObs(times - 1, true)));
}

// call with:
GetFilesObs(<number_of_tries>, false);

正如所写,除了触发重试之外,这对错误没有任何作用。特别是,当发生了足够多的错误时,它将完全没有错误地完成,这可能不是您想要的。