如何合并两个早期完成的可观察对象
How to merge two observables with early completion
内置 Merge
运算符的行为是在 两个 源完成时完成。我正在搜索此运算符的变体,它会生成一个可观察对象,该可观察对象会在两个源可观察对象的 any 完成时完成。例如,如果第一个 observable 成功完成,稍后 第二个 observable 完成但出现异常,我希望忽略此异常。
我想出了一个实现,将一个特殊的哨兵异常连接到两个可枚举,然后合并的序列捕获并抑制这个异常。我想知道我是否缺少更简单的解决方案。
/// <summary>
/// Merges elements from two observable sequences into a single observable sequence,
/// that completes as soon as any of the source observable sequences completes.
/// </summary>
public static IObservable<T> MergeUntilAnyCompletes<T>(this IObservable<T> first,
IObservable<T> second)
{
var sentinel = new Exception();
first = first.Concat(Observable.Throw<T>(sentinel));
second = second.Concat(Observable.Throw<T>(sentinel));
// Concat: Concatenates the second observable sequence to the first
// observable sequence upon successful termination of the first.
return first.Merge(second)
.Catch(handler: (Exception ex) =>
ex == sentinel ? Observable.Empty<T>() : Observable.Throw<T>(ex));
// Catch: Continues an observable sequence that is terminated by an exception
// of the specified type with the observable sequence produced by the handler.
}
有趣的技巧:
public static IObservable<T> MergeUntilAnyCompletes<T>(this IObservable<T> first,
IObservable<T> second)
{
return Observable.Merge(
first.Materialize(),
second.Materialize()
).Dematerialize();
}
Materialize
将 observable 变成通知的 observable,因此 Merge
将不再抑制 OnCompleted
通知。当您 Dematerialize
时,该操作员将看到 OnCompleted
并停止。
旁注:如果你想要一些有趣的、稍微学术一点的关于 Materialize
/Dematerialize
的阅读,请阅读 this blog post。他写的是 Ix,但同样适用于 Rx。
内置 Merge
运算符的行为是在 两个 源完成时完成。我正在搜索此运算符的变体,它会生成一个可观察对象,该可观察对象会在两个源可观察对象的 any 完成时完成。例如,如果第一个 observable 成功完成,稍后 第二个 observable 完成但出现异常,我希望忽略此异常。
我想出了一个实现,将一个特殊的哨兵异常连接到两个可枚举,然后合并的序列捕获并抑制这个异常。我想知道我是否缺少更简单的解决方案。
/// <summary>
/// Merges elements from two observable sequences into a single observable sequence,
/// that completes as soon as any of the source observable sequences completes.
/// </summary>
public static IObservable<T> MergeUntilAnyCompletes<T>(this IObservable<T> first,
IObservable<T> second)
{
var sentinel = new Exception();
first = first.Concat(Observable.Throw<T>(sentinel));
second = second.Concat(Observable.Throw<T>(sentinel));
// Concat: Concatenates the second observable sequence to the first
// observable sequence upon successful termination of the first.
return first.Merge(second)
.Catch(handler: (Exception ex) =>
ex == sentinel ? Observable.Empty<T>() : Observable.Throw<T>(ex));
// Catch: Continues an observable sequence that is terminated by an exception
// of the specified type with the observable sequence produced by the handler.
}
有趣的技巧:
public static IObservable<T> MergeUntilAnyCompletes<T>(this IObservable<T> first,
IObservable<T> second)
{
return Observable.Merge(
first.Materialize(),
second.Materialize()
).Dematerialize();
}
Materialize
将 observable 变成通知的 observable,因此 Merge
将不再抑制 OnCompleted
通知。当您 Dematerialize
时,该操作员将看到 OnCompleted
并停止。
旁注:如果你想要一些有趣的、稍微学术一点的关于 Materialize
/Dematerialize
的阅读,请阅读 this blog post。他写的是 Ix,但同样适用于 Rx。