使用 Observable SelectMany 处理 Reactive Extensions 错误
Reactive Extensions error handling with Observable SelectMany
我正在尝试使用响应式扩展库在特定文件夹上编写文件观察器
这个想法是监视硬盘驱动器文件夹中的新文件,等待文件完全写入并将事件推送给订阅者。我不想使用 FileSystemWatcher
因为它会为同一个文件引发两次 Changed 事件。
所以我把它写在 "reactive way"(我希望)中,如下所示:
var provider = new MessageProviderFake();
var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => provider.GetFiles());
using (source.Subscribe(_ => Console.WriteLine(_.Name), () => Console.WriteLine("completed to Console")))
{
Console.WriteLine("press Enter to stop");
Console.ReadLine();
}
但是我找不到 "reactive way" 来处理错误。例如,文件目录可能位于外部驱动器上,但由于连接问题而变得不可用。
所以我添加了 GetFilesSafe
来处理来自 Reactive Extensions 的异常错误:
static IEnumerable<MessageArg> GetFilesSafe(IMessageProvider provider)
{
try
{
return provider.GetFiles();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
return new MessageArg[0];
}
}
并像
一样使用它
var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => GetFilesSafe(provider));
有没有更好的方法让 SelectMany
调用 provider.GetFiles()
即使在引发异常时也是如此?在这种情况下,我使用错误计数器重复读取操作 N 次,然后失败(终止进程)。
Reactive Extensions中有"try N time and wait Q seconds between attempts"吗?
GetFilesSafe
也有一个问题:它 returns IEnumerable<MessageArg>
用于懒惰阅读但是它可以在迭代时引发并且异常将在 SelectMany
的某处抛出
有一个 Retry
扩展,它只是在当前一个错误时再次订阅可观察对象,但听起来它不能提供您想要的灵活性。
您可以使用 Catch
构建一些东西,如果外部发生错误,它会订阅您提供给它的可观察对象。类似于以下内容(未经测试):
IObservable<Thing> GetFilesObs(int times, bool delay) {
return Observable
.Return(0)
.Delay(TimeSpan.FromSeconds(delay ? <delay_time> : 0))
.SelectMany(_ => Observable.Defer(() => GetFilesErroringObservable()))
.Catch(Observable.Defer(() => GetFilesObs(times - 1, true)));
}
// call with:
GetFilesObs(<number_of_tries>, false);
正如所写,除了触发重试之外,这对错误没有任何作用。特别是,当发生了足够多的错误时,它将完全没有错误地完成,这可能不是您想要的。
我正在尝试使用响应式扩展库在特定文件夹上编写文件观察器
这个想法是监视硬盘驱动器文件夹中的新文件,等待文件完全写入并将事件推送给订阅者。我不想使用 FileSystemWatcher
因为它会为同一个文件引发两次 Changed 事件。
所以我把它写在 "reactive way"(我希望)中,如下所示:
var provider = new MessageProviderFake();
var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => provider.GetFiles());
using (source.Subscribe(_ => Console.WriteLine(_.Name), () => Console.WriteLine("completed to Console")))
{
Console.WriteLine("press Enter to stop");
Console.ReadLine();
}
但是我找不到 "reactive way" 来处理错误。例如,文件目录可能位于外部驱动器上,但由于连接问题而变得不可用。
所以我添加了 GetFilesSafe
来处理来自 Reactive Extensions 的异常错误:
static IEnumerable<MessageArg> GetFilesSafe(IMessageProvider provider)
{
try
{
return provider.GetFiles();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
return new MessageArg[0];
}
}
并像
一样使用它var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => GetFilesSafe(provider));
有没有更好的方法让 SelectMany
调用 provider.GetFiles()
即使在引发异常时也是如此?在这种情况下,我使用错误计数器重复读取操作 N 次,然后失败(终止进程)。
Reactive Extensions中有"try N time and wait Q seconds between attempts"吗?
GetFilesSafe
也有一个问题:它 returns IEnumerable<MessageArg>
用于懒惰阅读但是它可以在迭代时引发并且异常将在 SelectMany
的某处抛出
有一个 Retry
扩展,它只是在当前一个错误时再次订阅可观察对象,但听起来它不能提供您想要的灵活性。
您可以使用 Catch
构建一些东西,如果外部发生错误,它会订阅您提供给它的可观察对象。类似于以下内容(未经测试):
IObservable<Thing> GetFilesObs(int times, bool delay) {
return Observable
.Return(0)
.Delay(TimeSpan.FromSeconds(delay ? <delay_time> : 0))
.SelectMany(_ => Observable.Defer(() => GetFilesErroringObservable()))
.Catch(Observable.Defer(() => GetFilesObs(times - 1, true)));
}
// call with:
GetFilesObs(<number_of_tries>, false);
正如所写,除了触发重试之外,这对错误没有任何作用。特别是,当发生了足够多的错误时,它将完全没有错误地完成,这可能不是您想要的。