Rx:订阅异步函数并忽略错误
Rx: subscribe with async function and ignore errors
我想为可观察对象中的每个项目调用一个异步函数。正如回答 here,解决方案是使用 SelectMany
。但是,如果异步方法抛出异常,订阅将终止。我有以下似乎有效的解决方案:
obs.SelectMany(x => Observable
.FromAsync(() => RunAsync())
.Catch(Observable.Empty<string>()));
是否有更惯用的解决方案?
您也可以使用 OnErrorResumeNext。
obs.SelectMany(x => Observable
.FromAsync(() => RunAsync())
.OnErrorResumeNext(Observable.Empty<string>()));
有一种标准方法可以观察 RunAsync
调用中发生的异常,那就是使用 .Materialize()
.
.Materialize()
方法将 IObservable<T>
序列转换为 IObservable<Notification<T>>
序列,您可以在其中对 OnNext
、OnError
和 [=20] 进行推理=]来电。
我写了这个查询:
var obs = Observable.Range(0, 10);
obs
.SelectMany(x =>
Observable
.FromAsync(() => RunAsync())
.Materialize())
.Where(x => x.Kind != NotificationKind.OnCompleted)
.Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
.Subscribe(x => x.Dump());
有了这个支持代码:
private int counter = 0;
private Random rnd = new Random();
private System.Threading.Tasks.Task<string> RunAsync()
{
return System.Threading.Tasks.Task.Factory.StartNew(() =>
{
System.Threading.Interlocked.Increment(ref counter);
if (rnd.NextDouble() < 0.3)
{
throw new Exception(counter.ToString());
}
return counter.ToString();
});
}
当我 运行 它时,我得到这种输出:
2
4
5
1!
6
7
3!
10
8!
9
以 !
结尾的每一行都是对导致异常的 RunAsync
的调用。
我想为可观察对象中的每个项目调用一个异步函数。正如回答 here,解决方案是使用 SelectMany
。但是,如果异步方法抛出异常,订阅将终止。我有以下似乎有效的解决方案:
obs.SelectMany(x => Observable
.FromAsync(() => RunAsync())
.Catch(Observable.Empty<string>()));
是否有更惯用的解决方案?
您也可以使用 OnErrorResumeNext。
obs.SelectMany(x => Observable
.FromAsync(() => RunAsync())
.OnErrorResumeNext(Observable.Empty<string>()));
有一种标准方法可以观察 RunAsync
调用中发生的异常,那就是使用 .Materialize()
.
.Materialize()
方法将 IObservable<T>
序列转换为 IObservable<Notification<T>>
序列,您可以在其中对 OnNext
、OnError
和 [=20] 进行推理=]来电。
我写了这个查询:
var obs = Observable.Range(0, 10);
obs
.SelectMany(x =>
Observable
.FromAsync(() => RunAsync())
.Materialize())
.Where(x => x.Kind != NotificationKind.OnCompleted)
.Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
.Subscribe(x => x.Dump());
有了这个支持代码:
private int counter = 0;
private Random rnd = new Random();
private System.Threading.Tasks.Task<string> RunAsync()
{
return System.Threading.Tasks.Task.Factory.StartNew(() =>
{
System.Threading.Interlocked.Increment(ref counter);
if (rnd.NextDouble() < 0.3)
{
throw new Exception(counter.ToString());
}
return counter.ToString();
});
}
当我 运行 它时,我得到这种输出:
2
4
5
1!
6
7
3!
10
8!
9
以 !
结尾的每一行都是对导致异常的 RunAsync
的调用。