Rx.NET 是否有组合运算符结合了 Catch 和 Concat 的优点?

Rx.NET is there a combo operator that combines the goodness of Catch and Concat?

我有一个 observable,我需要在通知之间添加一个时间延迟,以防两个通知彼此距离太近,所以我这样做了。

var niceAndSlowObservable = 
observable
  .Select(x => Observable.Return(x))
  .Delay(1000)
  .Concat();

效果棒极了,无论项目发出的速度有多快,我每秒收到的通知都不会超过 1 个。

现在,我有一些 processing/transformation 正在进行,可能会导致异常,所以我想使用 catch。

niceAndSlowObservable
   .Select(x => TransformationThatCouldCauseException(x))
   .Catch<MyType, Exception>(x => 
               { 
                 HandleError(x);
                 return Observable.Empty<MyType>();
                });

上面的问题是,如果出现错误,整个流都会停止,而不仅仅是继续到下一个。在 Concat() 调用之前我确实有“流的流”,所以从技术上讲,下一个应该成为我正在考虑的后备,

基本上类似于下面的代码,但没有使用 try-catch 块和 Rx 运算符。

niceAndSlowObservable
   .Select(x => { 
          try { TransformationThatCouldCauseException(x); } catch () {return null;})
   .Where(x => x!= null);

我该怎么做?

Procmon 和我都找到了一种彼此相似的方法。这是我能够完成的,可能不是最好的但有效。

    var immortalObservable = 
    niceAndSlowObservable
       .Select(x => Observable.Start(() => TransformationThatCouldCauseException(x))
              .Catch<MyType, Exception>(ex => { LogError(ex)); return Observable.Empty<MyType>();}
       .Concat();

感谢 Procmon's 意见。

如果你想打破错误结束可观察对象的约定,并且你想显式处理异常,那么方法是使用 .Materialize().Dematerialize().

试试这个:

IObservable<int> immortalObservable =
    niceAndSlowObservable
        .SelectMany(x =>
            Observable
                .Start(() => TransformationThatCouldCauseException(x))
                .Materialize()
                .Select(x =>
                {
                    if (x.Kind == NotificationKind.OnError)
                    {
                        HandleError(x.Exception);
                        return Notification.CreateOnCompleted<int>()
                    }
                    return x;
                })
                .Dematerialize());

这使您可以操纵流并将错误更改为正常的已完成消息。您有机会处理实际的异常。

请注意,因为我需要在 Notification.CreateOnCompleted<int>() 中提供类型,所以我选择了 int。您需要为您的可观察对象使用正确的类型。