Rx.NET 是否有组合运算符结合了 Catch 和 Concat 的优点?
Rx.NET is there a combo operator that combines the goodness of Catch and Concat?
我有一个 observable,我需要在通知之间添加一个时间延迟,以防两个通知彼此距离太近,所以我这样做了。
var niceAndSlowObservable =
observable
.Select(x => Observable.Return(x))
.Delay(1000)
.Concat();
效果棒极了,无论项目发出的速度有多快,我每秒收到的通知都不会超过 1 个。
现在,我有一些 processing/transformation 正在进行,可能会导致异常,所以我想使用 catch。
niceAndSlowObservable
.Select(x => TransformationThatCouldCauseException(x))
.Catch<MyType, Exception>(x =>
{
HandleError(x);
return Observable.Empty<MyType>();
});
上面的问题是,如果出现错误,整个流都会停止,而不仅仅是继续到下一个。在 Concat() 调用之前我确实有“流的流”,所以从技术上讲,下一个应该成为我正在考虑的后备,
基本上类似于下面的代码,但没有使用 try-catch 块和 Rx 运算符。
niceAndSlowObservable
.Select(x => {
try { TransformationThatCouldCauseException(x); } catch () {return null;})
.Where(x => x!= null);
我该怎么做?
Procmon 和我都找到了一种彼此相似的方法。这是我能够完成的,可能不是最好的但有效。
var immortalObservable =
niceAndSlowObservable
.Select(x => Observable.Start(() => TransformationThatCouldCauseException(x))
.Catch<MyType, Exception>(ex => { LogError(ex)); return Observable.Empty<MyType>();}
.Concat();
感谢 Procmon's
意见。
如果你想打破错误结束可观察对象的约定,并且你想显式处理异常,那么方法是使用 .Materialize()
和 .Dematerialize()
.
试试这个:
IObservable<int> immortalObservable =
niceAndSlowObservable
.SelectMany(x =>
Observable
.Start(() => TransformationThatCouldCauseException(x))
.Materialize()
.Select(x =>
{
if (x.Kind == NotificationKind.OnError)
{
HandleError(x.Exception);
return Notification.CreateOnCompleted<int>()
}
return x;
})
.Dematerialize());
这使您可以操纵流并将错误更改为正常的已完成消息。您有机会处理实际的异常。
请注意,因为我需要在 Notification.CreateOnCompleted<int>()
中提供类型,所以我选择了 int
。您需要为您的可观察对象使用正确的类型。
我有一个 observable,我需要在通知之间添加一个时间延迟,以防两个通知彼此距离太近,所以我这样做了。
var niceAndSlowObservable =
observable
.Select(x => Observable.Return(x))
.Delay(1000)
.Concat();
效果棒极了,无论项目发出的速度有多快,我每秒收到的通知都不会超过 1 个。
现在,我有一些 processing/transformation 正在进行,可能会导致异常,所以我想使用 catch。
niceAndSlowObservable
.Select(x => TransformationThatCouldCauseException(x))
.Catch<MyType, Exception>(x =>
{
HandleError(x);
return Observable.Empty<MyType>();
});
上面的问题是,如果出现错误,整个流都会停止,而不仅仅是继续到下一个。在 Concat() 调用之前我确实有“流的流”,所以从技术上讲,下一个应该成为我正在考虑的后备,
基本上类似于下面的代码,但没有使用 try-catch 块和 Rx 运算符。
niceAndSlowObservable
.Select(x => {
try { TransformationThatCouldCauseException(x); } catch () {return null;})
.Where(x => x!= null);
我该怎么做?
Procmon 和我都找到了一种彼此相似的方法。这是我能够完成的,可能不是最好的但有效。
var immortalObservable =
niceAndSlowObservable
.Select(x => Observable.Start(() => TransformationThatCouldCauseException(x))
.Catch<MyType, Exception>(ex => { LogError(ex)); return Observable.Empty<MyType>();}
.Concat();
感谢 Procmon's
意见。
如果你想打破错误结束可观察对象的约定,并且你想显式处理异常,那么方法是使用 .Materialize()
和 .Dematerialize()
.
试试这个:
IObservable<int> immortalObservable =
niceAndSlowObservable
.SelectMany(x =>
Observable
.Start(() => TransformationThatCouldCauseException(x))
.Materialize()
.Select(x =>
{
if (x.Kind == NotificationKind.OnError)
{
HandleError(x.Exception);
return Notification.CreateOnCompleted<int>()
}
return x;
})
.Dematerialize());
这使您可以操纵流并将错误更改为正常的已完成消息。您有机会处理实际的异常。
请注意,因为我需要在 Notification.CreateOnCompleted<int>()
中提供类型,所以我选择了 int
。您需要为您的可观察对象使用正确的类型。