如何实现等待所有可观察子序列完成的自定义 SelectMany 运算符?
How to implement a custom SelectMany operator that waits for all observable subsequences to complete?
我正在使用 SelectMany
运算符,以便将可观察序列的元素投射到任务,并传播这些任务的结果。如果所有操作都成功,一切都很好,但我不喜欢这样,以防出现异常,所有当前 运行 操作都变得一劳永逸。我宁愿等到所有挂起的操作都完成,然后才收到已发生的错误的通知。这是我想避免的行为的最小示例:
try
{
await Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => (int)x + 1)
.Take(5)
.SelectMany(x => Observable.FromAsync(async ct =>
{
await Task.Delay(500); // Simulate an I/O operation
if (x == 3) throw new ApplicationException("Oops!");
Console.WriteLine($"Operation #{x} completed");
return x;
}))
.Do(x => Console.WriteLine($"Result: {x}"));
}
catch (Exception ex)
{
Console.WriteLine($"Failed, {ex.GetType().Name}: {ex.Message}");
}
await Task.Delay(1000);
输出(不理想):
Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Failed, ApplicationException: Oops!
Operation #4 completed
Operation #5 completed
理想的输出应如下所示:
Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Operation #4 completed
Result: 4
Operation #5 completed
Result: 5
Failed, AggregateException: One or more errors occurred. (Oops!)
序列应该仅在所有其他 运行 操作(#4 和 #5)完成后传播项目 #3 的异常。
在上面的例子中,我故意省略了 Observable.FromAsync
方法的 CancellationToken
参数,因为我想模拟启动的异步操作不可取消的情况,或者它们的取消不是瞬时的。
我正在考虑使用以下签名实现自定义运算符 SelectManyUntilCompletion
:
public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, IObservable<TResult>> selector);
签名与SelectMany
几乎相同。唯一的区别是 selector
委托有一个 CancellationToken
参数。当任何其他子序列中发生错误时,应取消此标记。这个想法是,操作员不应突然取消订阅子序列,而应传达取消信息,但保持订阅子序列直到它们自然完成。
我的问题是:我该如何实现这个运算符?是否可以基于现有的 SelectMany
来实现它,或者我应该使用 Observable.Create
?
在较低的级别上实现它
下面是运营商的详细说明。如果成功,它的行为应该与 SelectMany
相同,所以我只描述它在错误情况下的行为。
- 当所有子序列都完成时,生成的序列应该完成。
source
序列应该在发生错误后立即取消订阅,这样就不会再创建子序列。
CancellationToken
应该在发生错误后立即发出信号。
- 结果序列应该传播所有子序列在错误 之前和之后 之后产生的所有
TResult
值。
- 结果序列最终应该传播所有发生的错误,捆绑在
AggregateException
中。这包括 source
序列的可能错误。
AggregateException
应该 不 包括任何可能因取消 CancellationToken
而发生的 OperationCanceledException
。
大理石图:
Source: +-----A-----B-------------C---------------D----|
Subsequence-A: +-------------a---------|
Subsequence-B: +---b---------------------X
Subsequence-C: +-------c----------------c----|
Subsequence-D:
Result: +---------------b---a-------------c----------------c----X
子序列 D 未被订阅,因为它是在子序列 B 失败后发出的。
弹珠图表明子序列C没有及时响应取消信号,这是一个有效的场景。
这是解决此问题的一种方法。下面的实现基于 SelectMany
运算符。所有涉及的可观察序列都使用 Catch
+Empty
组合抑制了错误。错误汇总在 ConcurrentQueue<Exception>
中,并从最终的 Concat
+Defer
组合中抛出。
/// <summary>
/// Projects each element of the source observable sequence to a subsequence,
/// and merges the resulting subsequences into one observable sequence.
/// The merged sequence completes when all the projected subsequences complete
/// on their own. Unlike the SelectMany operator, the subsequences are not
/// unsubscribed when an error occurs.
/// </summary>
public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, IObservable<TResult>> selector)
{
return Observable.Defer(() =>
{
var cts = new CancellationTokenSource();
var errors = new ConcurrentQueue<Exception>();
var stopSignal = new Subject<Unit>();
var stopSignalSynchronized = Observer.Synchronize(stopSignal);
IObservable<T> HandleErrorReturnEmpty<T>(Exception ex)
{
cts.Cancel();
bool ignoreError = ex is OperationCanceledException
&& cts.IsCancellationRequested;
if (!ignoreError) errors.Enqueue(ex);
stopSignalSynchronized.OnNext(default);
return Observable.Empty<T>();
}
return source
.TakeUntil(stopSignal)
.Catch((Exception ex) => HandleErrorReturnEmpty<TSource>(ex))
.SelectMany(item =>
{
if (!errors.IsEmpty) return Observable.Empty<TResult>();
IObservable<TResult> projected;
try { projected = selector(item, cts.Token); }
catch (Exception ex) { return HandleErrorReturnEmpty<TResult>(ex); }
return projected
.Catch((Exception ex) => HandleErrorReturnEmpty<TResult>(ex));
})
.Concat(Observable.Defer(() =>
{
cts.Dispose();
if (!errors.IsEmpty) throw new AggregateException(errors);
return Observable.Empty<TResult>();
}));
});
}
如果出现错误,停止信号通过同步 Subject<Unit>
传播,并由链接到 source
的 TakeUntil
运算符观察。
用法示例:
//...
.SelectManyUntilCompletion((item, token) => Observable.FromAsync(async () =>
{
//...
}))
//...
我正在使用 SelectMany
运算符,以便将可观察序列的元素投射到任务,并传播这些任务的结果。如果所有操作都成功,一切都很好,但我不喜欢这样,以防出现异常,所有当前 运行 操作都变得一劳永逸。我宁愿等到所有挂起的操作都完成,然后才收到已发生的错误的通知。这是我想避免的行为的最小示例:
try
{
await Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => (int)x + 1)
.Take(5)
.SelectMany(x => Observable.FromAsync(async ct =>
{
await Task.Delay(500); // Simulate an I/O operation
if (x == 3) throw new ApplicationException("Oops!");
Console.WriteLine($"Operation #{x} completed");
return x;
}))
.Do(x => Console.WriteLine($"Result: {x}"));
}
catch (Exception ex)
{
Console.WriteLine($"Failed, {ex.GetType().Name}: {ex.Message}");
}
await Task.Delay(1000);
输出(不理想):
Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Failed, ApplicationException: Oops!
Operation #4 completed
Operation #5 completed
理想的输出应如下所示:
Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Operation #4 completed
Result: 4
Operation #5 completed
Result: 5
Failed, AggregateException: One or more errors occurred. (Oops!)
序列应该仅在所有其他 运行 操作(#4 和 #5)完成后传播项目 #3 的异常。
在上面的例子中,我故意省略了 Observable.FromAsync
方法的 CancellationToken
参数,因为我想模拟启动的异步操作不可取消的情况,或者它们的取消不是瞬时的。
我正在考虑使用以下签名实现自定义运算符 SelectManyUntilCompletion
:
public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, IObservable<TResult>> selector);
签名与SelectMany
几乎相同。唯一的区别是 selector
委托有一个 CancellationToken
参数。当任何其他子序列中发生错误时,应取消此标记。这个想法是,操作员不应突然取消订阅子序列,而应传达取消信息,但保持订阅子序列直到它们自然完成。
我的问题是:我该如何实现这个运算符?是否可以基于现有的 SelectMany
来实现它,或者我应该使用 Observable.Create
?
下面是运营商的详细说明。如果成功,它的行为应该与 SelectMany
相同,所以我只描述它在错误情况下的行为。
- 当所有子序列都完成时,生成的序列应该完成。
source
序列应该在发生错误后立即取消订阅,这样就不会再创建子序列。CancellationToken
应该在发生错误后立即发出信号。- 结果序列应该传播所有子序列在错误 之前和之后 之后产生的所有
TResult
值。 - 结果序列最终应该传播所有发生的错误,捆绑在
AggregateException
中。这包括source
序列的可能错误。 AggregateException
应该 不 包括任何可能因取消CancellationToken
而发生的OperationCanceledException
。
大理石图:
Source: +-----A-----B-------------C---------------D----|
Subsequence-A: +-------------a---------|
Subsequence-B: +---b---------------------X
Subsequence-C: +-------c----------------c----|
Subsequence-D:
Result: +---------------b---a-------------c----------------c----X
子序列 D 未被订阅,因为它是在子序列 B 失败后发出的。
弹珠图表明子序列C没有及时响应取消信号,这是一个有效的场景。
这是解决此问题的一种方法。下面的实现基于 SelectMany
运算符。所有涉及的可观察序列都使用 Catch
+Empty
组合抑制了错误。错误汇总在 ConcurrentQueue<Exception>
中,并从最终的 Concat
+Defer
组合中抛出。
/// <summary>
/// Projects each element of the source observable sequence to a subsequence,
/// and merges the resulting subsequences into one observable sequence.
/// The merged sequence completes when all the projected subsequences complete
/// on their own. Unlike the SelectMany operator, the subsequences are not
/// unsubscribed when an error occurs.
/// </summary>
public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, IObservable<TResult>> selector)
{
return Observable.Defer(() =>
{
var cts = new CancellationTokenSource();
var errors = new ConcurrentQueue<Exception>();
var stopSignal = new Subject<Unit>();
var stopSignalSynchronized = Observer.Synchronize(stopSignal);
IObservable<T> HandleErrorReturnEmpty<T>(Exception ex)
{
cts.Cancel();
bool ignoreError = ex is OperationCanceledException
&& cts.IsCancellationRequested;
if (!ignoreError) errors.Enqueue(ex);
stopSignalSynchronized.OnNext(default);
return Observable.Empty<T>();
}
return source
.TakeUntil(stopSignal)
.Catch((Exception ex) => HandleErrorReturnEmpty<TSource>(ex))
.SelectMany(item =>
{
if (!errors.IsEmpty) return Observable.Empty<TResult>();
IObservable<TResult> projected;
try { projected = selector(item, cts.Token); }
catch (Exception ex) { return HandleErrorReturnEmpty<TResult>(ex); }
return projected
.Catch((Exception ex) => HandleErrorReturnEmpty<TResult>(ex));
})
.Concat(Observable.Defer(() =>
{
cts.Dispose();
if (!errors.IsEmpty) throw new AggregateException(errors);
return Observable.Empty<TResult>();
}));
});
}
如果出现错误,停止信号通过同步 Subject<Unit>
传播,并由链接到 source
的 TakeUntil
运算符观察。
用法示例:
//...
.SelectManyUntilCompletion((item, token) => Observable.FromAsync(async () =>
{
//...
}))
//...