Observable.Repeat 势不可挡,是 bug 还是 feature?

The Observable.Repeat is unstoppable, is it a bug or a feature?

我注意到 Repeat operator, when the source observable's notifications are synchronous. The resulting observable cannot be stopped with a subsequent TakeWhile 运算符的行为有些奇怪,并且显然会一直持续 运行。为了演示,我创建了一个生成单个值的源可观察对象,该值在每次订阅时递增。第一个订阅者获得值 1,第二个订阅者获得值 2 等等:

int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
    await Task.CompletedTask;
    //await Task.Yield();

    Thread.Sleep(100);
    var value = Interlocked.Increment(ref incrementalValue);
    o.OnNext(value);
    o.OnCompleted();
});

然后我将运算符 RepeatTakeWhileLastAsync 附加到这个 observable,这样程序将等待组合的 observable 产生它的最后一个值:

incremental.Repeat()
    .Do(new CustomObserver("Checkpoint A"))
    .TakeWhile(item => item <= 5)
    .Do(new CustomObserver("Checkpoint B"))
    .LastAsync()
    .Do(new CustomObserver("Checkpoint C"))
    .Wait();
Console.WriteLine($"Done");

class CustomObserver : IObserver<int>
{
    private readonly string _name;
    public CustomObserver(string name) => _name = name;
    public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
    public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
    public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}

这是这个程序的输出:

Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...

它永远不会结束!虽然 LastAsync 已经产生了它的值并完成了,但是 Repeat 运算符还在不停地旋转!

只有当源可观察对象同步通知其订阅者时才会发生这种情况。例如,取消注释行 //await Task.Yield(); 后,程序的行为符合预期:

Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done

Repeat 运算符停止旋转,尽管它没有报告完成(我猜它已被取消订阅)。

有没有什么方法可以使 Repeat 运算符的行为保持一致,而不管它收到的通知类型(同步或异步)如何?

.NET Core 3.0,C# 8,System.Reactive 4.3.2,控制台应用程序

您可能希望 Repeat 的实现以 OnCompleted 通知为特色,但它在 Concat-ing 无限流方面变成 it's implemented

    public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) =>
        RepeatInfinite(source).Concat();

    private static IEnumerable<T> RepeatInfinite<T>(T value)
    {
        while (true)
        {
            yield return value;
        }
    }

随着责任转移到 Concat - 我们可以创建一个简化版本(血淋淋的实施细节在 TailRecursiveSink.cs 中)。除非 await Task.Yield().

提供了不同的执行上下文,否则它仍然会继续旋转
public static IObservable<T> ConcatEx<T>(this IEnumerable<IObservable<T>> enumerable) =>
    Observable.Create<T>(observer =>
    {
        var check = new BooleanDisposable();

        IDisposable loopRec(IScheduler inner, IEnumerator<IObservable<T>> enumerator)
        {
            if (check.IsDisposed)
                return Disposable.Empty;

            if (enumerator.MoveNext()) //this never returns false
                return enumerator.Current.Subscribe(
                    observer.OnNext,
                    () => inner.Schedule(enumerator, loopRec) //<-- starts next immediately
                );
            else
                return inner.Schedule(observer.OnCompleted); //this never runs
        }

        Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever
        return check;
    });

作为无限流,enumerator.MoveNext() 始终 returns 为真,因此另一个分支永远不会运行 - 这是预期的;这不是我们的问题。

当调用o.OnCompleted()时,它会立即安排下一个迭代循环 Schedule(enumerator, loopRec) 同步调用下一个 o.OnCompleted(),并且它无限地继续 - 没有任何地方可以逃脱此递归。

如果用await Task.Yield()进行上下文切换,那么Schedule(enumerator, loopRec)会立即退出,o.OnCompleted()会被非同步调用

RepeatConcat 使用当前线程在不更改上下文的情况下工作 - 这不是不正确的行为,但是当相同的上下文也用于推送通知时,它可能会导致僵局或陷入困境 永久蹦床。

带注释的调用堆栈

[External Code] 
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code] 
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code] 
ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec)
[External Code] 
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code] 
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code] 
ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(...)
[External Code] 
Main(args) //incremental.RepeatEx()...