如何实现等待所有可观察子序列完成的自定义 SelectMany 运算符?

How to implement a custom SelectMany operator that waits for all observable subsequences to complete?

我正在使用 SelectMany 运算符,以便将可观察序列的元素投射到任务,并传播这些任务的结果。如果所有操作都成功,一切都很好,但我不喜欢这样,以防出现异常,所有当前 运行 操作都变得一劳永逸。我宁愿等到所有挂起的操作都完成,然后才收到已发生的错误的通知。这是我想避免的行为的最小示例:

try
{
    await Observable
        .Interval(TimeSpan.FromMilliseconds(100))
        .Select(x => (int)x + 1)
        .Take(5)
        .SelectMany(x => Observable.FromAsync(async ct =>
        {
            await Task.Delay(500); // Simulate an I/O operation
            if (x == 3) throw new ApplicationException("Oops!");
            Console.WriteLine($"Operation #{x} completed");
            return x;
        }))
        .Do(x => Console.WriteLine($"Result: {x}"));
}
catch (Exception ex)
{
    Console.WriteLine($"Failed, {ex.GetType().Name}: {ex.Message}");
}
await Task.Delay(1000);

输出(不理想):

Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Failed, ApplicationException: Oops!
Operation #4 completed
Operation #5 completed

Try it on Fiddle:

理想的输出应如下所示:

Operation #1 completed
Result: 1
Operation #2 completed
Result: 2
Operation #4 completed
Result: 4
Operation #5 completed
Result: 5
Failed, AggregateException: One or more errors occurred. (Oops!)

序列应该仅在所有其他 运行 操作(#4 和 #5)完成后传播项目 #3 的异常。

在上面的例子中,我故意省略了 Observable.FromAsync 方法的 CancellationToken 参数,因为我想模拟启动的异步操作不可取消的情况,或者它们的取消不是瞬时的。

我正在考虑使用以下签名实现自定义运算符 SelectManyUntilCompletion

public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, CancellationToken, IObservable<TResult>> selector);

签名与SelectMany几乎相同。唯一的区别是 selector 委托有一个 CancellationToken 参数。当任何其他子序列中发生错误时,应取消此标记。这个想法是,操作员不应突然取消订阅子序列,而应传达取消信息,但保持订阅子序列直到它们自然完成。

我的问题是:我该如何实现这个运算符?是否可以基于现有的 SelectMany 来实现它,或者我应该使用 Observable.Create?

在较低的级别上实现它

下面是运营商的详细说明。如果成功,它的行为应该与 SelectMany 相同,所以我只描述它在错误情况下的行为。

  1. 当所有子序列都完成时,生成的序列应该完成。
  2. source 序列应该在发生错误后立即取消订阅,这样就不会再创建子序列。
  3. CancellationToken 应该在发生错误后立即发出信号。
  4. 结果序列应该传播所有子序列在错误 之前和之后 之后产生的所有 TResult 值。
  5. 结果序列最终应该传播所有发生的错误,捆绑在 AggregateException 中。这包括 source 序列的可能错误。
  6. AggregateException 应该 包括任何可能因取消 CancellationToken 而发生的 OperationCanceledException

大理石图:

Source:        +-----A-----B-------------C---------------D----|
Subsequence-A:       +-------------a---------|
Subsequence-B:             +---b---------------------X
Subsequence-C:                           +-------c----------------c----|
Subsequence-D:
Result:        +---------------b---a-------------c----------------c----X

子序列 D 未被订阅,因为它是在子序列 B 失败后发出的。

弹珠图表明子序列C没有及时响应取消信号,这是一个有效的场景。

这是解决此问题的一种方法。下面的实现基于 SelectMany 运算符。所有涉及的可观察序列都使用 Catch+Empty 组合抑制了错误。错误汇总在 ConcurrentQueue<Exception> 中,并从最终的 Concat+Defer 组合中抛出。

/// <summary>
/// Projects each element of the source observable sequence to a subsequence,
/// and merges the resulting subsequences into one observable sequence.
/// The merged sequence completes when all the projected subsequences complete
/// on their own. Unlike the SelectMany operator, the subsequences are not
/// unsubscribed when an error occurs.
/// </summary>
public static IObservable<TResult> SelectManyUntilCompletion<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, CancellationToken, IObservable<TResult>> selector)
{
    return Observable.Defer(() =>
    {
        var cts = new CancellationTokenSource();
        var errors = new ConcurrentQueue<Exception>();
        var stopSignal = new Subject<Unit>();
        var stopSignalSynchronized = Observer.Synchronize(stopSignal);
            
        IObservable<T> HandleErrorReturnEmpty<T>(Exception ex)
        {
            cts.Cancel();
            bool ignoreError = ex is OperationCanceledException
                && cts.IsCancellationRequested;
            if (!ignoreError) errors.Enqueue(ex);
            stopSignalSynchronized.OnNext(default);
            return Observable.Empty<T>();
        }

        return source
            .TakeUntil(stopSignal)
            .Catch((Exception ex) => HandleErrorReturnEmpty<TSource>(ex))
            .SelectMany(item =>
            {
                if (!errors.IsEmpty) return Observable.Empty<TResult>();
                IObservable<TResult> projected;
                try { projected = selector(item, cts.Token); }
                catch (Exception ex) { return HandleErrorReturnEmpty<TResult>(ex); }
                return projected
                    .Catch((Exception ex) => HandleErrorReturnEmpty<TResult>(ex));
            })
            .Concat(Observable.Defer(() =>
            {
                cts.Dispose();
                if (!errors.IsEmpty) throw new AggregateException(errors);
                return Observable.Empty<TResult>();
            }));
    });
}

如果出现错误,停止信号通过同步 Subject<Unit> 传播,并由链接到 sourceTakeUntil 运算符观察。

用法示例:

//...
.SelectManyUntilCompletion((item, token) => Observable.FromAsync(async () =>
{
    //...
}))
//...