为什么 Observable.ToEnumerable() 在基础序列完成之前不产生值?
Why Observable.ToEnumerable() does not produce values until underlying sequence completes?
运行以下代码
foreach(var i in
Observable
.Range(1, 3)
.Do(Console.WriteLine)
.ToEnumerable())
Console.WriteLine("Fin:" + i);
我得到这个输出:
1
2
3
Fin:1
Fin:2
Fin:3
问题是 - 为什么 ToEnumerable 缓存所有值并在源序列完成后立即提供它们?
它与“离开 monad”有某种关系吗?
ToEnumerable
不会等待 observable 完成,在这种情况下,observable 恰好同步完成。 Observable.Interval
显示:
var enumerable =
Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Take(3)
.Do(i => Console.WriteLine("obs: {0}", i))
.ToEnumerable();
foreach (int value in enumerable)
{
Console.WriteLine(value);
}
如果你深入研究 Rx 库的 source code,你会发现 ToEnumerable
运算符基本上是这样实现的:
public static IEnumerable<T> ToEnumerable<T>(this IObservable<T> source)
{
using var enumerator = new GetEnumerator<T>();
enumerator.Run(source);
while (enumerator.MoveNext()) yield return enumerator.Current;
}
...其中 GetEnumerator<T>
是 this 文件中定义的 class。这个class是一个IEnumerator<T>
和一个IObserver<T>
。它有一个内部 _queue
(ConcurrentQueue<T>
) 用于存储收到的项目。最有趣的方法是 Run
、OnNext
和 MoveNext
:
public IEnumerator<T> Run(IObservable<T> source)
{
_subscription.Disposable = source.Subscribe(this);
return this;
}
public void OnNext(T value)
{
_queue.Enqueue(value);
_gate.Release();
}
public bool MoveNext()
{
_gate.Wait();
if (_queue.TryDequeue(out _current)) return true;
_error?.Throw();
return false;
}
在您的代码中,当您启动 foreach
循环时,Run
方法运行,并且订阅 Range+Do
序列。此序列在订阅期间发出其所有元素。 OnNext
方法为每个发出的元素调用,因此所有元素都在 _queue
中排队。 Run
方法完成后,执行 while
循环,使排队的元素出队并产生。这就是为什么您会在 foreach
循环的任何迭代之前看到 Do
运算符的所有副作用。
Rx 库包含另一个类似于 ToEnumerable
的运算符,Next
运算符,具有此签名:
// Returns an enumerable sequence whose enumeration blocks until the next element
// in the source observable sequence becomes available. Enumerators on the resulting
// sequence will block until the next element becomes available.
public static IEnumerable<T> Next<T>(this IObservable<T> source);
根据我的过期,这个操作员也没有做你想做的事。
运行以下代码
foreach(var i in
Observable
.Range(1, 3)
.Do(Console.WriteLine)
.ToEnumerable())
Console.WriteLine("Fin:" + i);
我得到这个输出:
1
2
3
Fin:1
Fin:2
Fin:3
问题是 - 为什么 ToEnumerable 缓存所有值并在源序列完成后立即提供它们? 它与“离开 monad”有某种关系吗?
ToEnumerable
不会等待 observable 完成,在这种情况下,observable 恰好同步完成。 Observable.Interval
显示:
var enumerable =
Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Take(3)
.Do(i => Console.WriteLine("obs: {0}", i))
.ToEnumerable();
foreach (int value in enumerable)
{
Console.WriteLine(value);
}
如果你深入研究 Rx 库的 source code,你会发现 ToEnumerable
运算符基本上是这样实现的:
public static IEnumerable<T> ToEnumerable<T>(this IObservable<T> source)
{
using var enumerator = new GetEnumerator<T>();
enumerator.Run(source);
while (enumerator.MoveNext()) yield return enumerator.Current;
}
...其中 GetEnumerator<T>
是 this 文件中定义的 class。这个class是一个IEnumerator<T>
和一个IObserver<T>
。它有一个内部 _queue
(ConcurrentQueue<T>
) 用于存储收到的项目。最有趣的方法是 Run
、OnNext
和 MoveNext
:
public IEnumerator<T> Run(IObservable<T> source)
{
_subscription.Disposable = source.Subscribe(this);
return this;
}
public void OnNext(T value)
{
_queue.Enqueue(value);
_gate.Release();
}
public bool MoveNext()
{
_gate.Wait();
if (_queue.TryDequeue(out _current)) return true;
_error?.Throw();
return false;
}
在您的代码中,当您启动 foreach
循环时,Run
方法运行,并且订阅 Range+Do
序列。此序列在订阅期间发出其所有元素。 OnNext
方法为每个发出的元素调用,因此所有元素都在 _queue
中排队。 Run
方法完成后,执行 while
循环,使排队的元素出队并产生。这就是为什么您会在 foreach
循环的任何迭代之前看到 Do
运算符的所有副作用。
Rx 库包含另一个类似于 ToEnumerable
的运算符,Next
运算符,具有此签名:
// Returns an enumerable sequence whose enumeration blocks until the next element
// in the source observable sequence becomes available. Enumerators on the resulting
// sequence will block until the next element becomes available.
public static IEnumerable<T> Next<T>(this IObservable<T> source);
根据我的过期,这个操作员也没有做你想做的事。