为什么 Observable.ToEnumerable() 在基础序列完成之前不产生值?

Why Observable.ToEnumerable() does not produce values until underlying sequence completes?

运行以下代码

    foreach(var i in
    Observable
        .Range(1, 3)
        .Do(Console.WriteLine)
        .ToEnumerable())
        Console.WriteLine("Fin:" + i);

我得到这个输出:

1
2
3
Fin:1
Fin:2
Fin:3

问题是 - 为什么 ToEnumerable 缓存所有值并在源序列完成后立即提供它们? 它与“离开 monad”有某种关系吗?

ToEnumerable 不会等待 observable 完成,在这种情况下,observable 恰好同步完成。 Observable.Interval 显示:

var enumerable =
    Observable
        .Interval(TimeSpan.FromMilliseconds(100))
        .Take(3)
        .Do(i => Console.WriteLine("obs: {0}", i))
        .ToEnumerable();

foreach (int value in enumerable)
{
    Console.WriteLine(value);
}

如果你深入研究 Rx 库的 source code,你会发现 ToEnumerable 运算符基本上是这样实现的:

public static IEnumerable<T> ToEnumerable<T>(this IObservable<T> source)
{
    using var enumerator = new GetEnumerator<T>();
    enumerator.Run(source);
    while (enumerator.MoveNext()) yield return enumerator.Current;
}

...其中 GetEnumerator<T>this 文件中定义的 class。这个class是一个IEnumerator<T>和一个IObserver<T>。它有一个内部 _queue (ConcurrentQueue<T>) 用于存储收到的项目。最有趣的方法是 RunOnNextMoveNext:

public IEnumerator<T> Run(IObservable<T> source)
{
    _subscription.Disposable = source.Subscribe(this);
    return this;
}

public void OnNext(T value)
{
    _queue.Enqueue(value);
    _gate.Release();
}

public bool MoveNext()
{
    _gate.Wait();
    if (_queue.TryDequeue(out _current)) return true;
    _error?.Throw();
    return false;
}

在您的代码中,当您启动 foreach 循环时,Run 方法运行,并且订阅 Range+Do 序列。此序列在订阅期间发出其所有元素。 OnNext 方法为每个发出的元素调用,因此所有元素都在 _queue 中排队。 Run 方法完成后,执行 while 循环,使排队的元素出队并产生。这就是为什么您会在 foreach 循环的任何迭代之前看到 Do 运算符的所有副作用。

Rx 库包含另一个类似于 ToEnumerable 的运算符,Next 运算符,具有此签名:

// Returns an enumerable sequence whose enumeration blocks until the next element
// in the source observable sequence becomes available. Enumerators on the resulting
// sequence will block until the next element becomes available.
public static IEnumerable<T> Next<T>(this IObservable<T> source);

根据我的过期,这个操作员也没有做你想做的事。