Rx:订阅异步函数并忽略错误

Rx: subscribe with async function and ignore errors

我想为可观察对象中的每个项目调用一个异步函数。正如回答 here,解决方案是使用 SelectMany。但是,如果异步方法抛出异常,订阅将终止。我有以下似乎有效的解决方案:

obs.SelectMany(x => Observable
    .FromAsync(() => RunAsync())
    .Catch(Observable.Empty<string>()));

是否有更惯用的解决方案?

您也可以使用 OnErrorResumeNext。

obs.SelectMany(x => Observable
    .FromAsync(() => RunAsync())
    .OnErrorResumeNext(Observable.Empty<string>()));

有一种标准方法可以观察 RunAsync 调用中发生的异常,那就是使用 .Materialize().

.Materialize() 方法将 IObservable<T> 序列转换为 IObservable<Notification<T>> 序列,您可以在其中对 OnNextOnError 和 [=20] 进行推理=]来电。

我写了这个查询:

var obs = Observable.Range(0, 10);

obs
    .SelectMany(x =>
        Observable
            .FromAsync(() => RunAsync())
            .Materialize())
    .Where(x => x.Kind != NotificationKind.OnCompleted)
    .Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
    .Subscribe(x => x.Dump());

有了这个支持代码:

private int counter = 0;
private Random rnd = new Random();

private System.Threading.Tasks.Task<string> RunAsync()
{
    return System.Threading.Tasks.Task.Factory.StartNew(() =>
    {
        System.Threading.Interlocked.Increment(ref counter);
        if (rnd.NextDouble() < 0.3)
        {
            throw new Exception(counter.ToString());
        }
        return counter.ToString();
    });
}

当我 运行 它时,我得到这种输出:

2
4
5
1!
6
7
3!
10
8!
9

! 结尾的每一行都是对导致异常的 RunAsync 的调用。