Reactive Extensions ToEnumerable 如果不迭代所有内容,如何处理可观察状态

Reactive Extensions ToEnumerable how to dispose observable state if not iterating everything

如果我在 IObservable 上使用 ToEnumerable 扩展,用户可能不会重复所有元素。那么在Observable.Create中声明的IDisposable如何妥善处理呢?

为了争论起见,我们假设直接 return 和 IObservable 给用户不是一个选项(在这种情况下,用户可以自己实现取消)。

 private IObservable<Object> MakeObservable()
 {
   return Observable.Create(async (observer, cancelToken) =>
   {
     using(SomeDisposable somedisposable = new SomeDisposable())
     {
        while(true)
        {
          Object result = somedisposable.GetNextObject();
          if(result == null)
          {
            break;
          }
          observer.OnNext(result);
        }       
     }
   }
 }

 public IEnumerable<Object> GetObjects()
 {
   return MakeObservable().ToEnumerable();
 }

 public void Test()
 {
   IEnumerable<Object> e = GetObjects();
   int i = 0;
   foreach(Object o in e)
   {
     if(i++ == 10)
        break;
   }
   //somedisposable is not disposed here!!!
 }

问题是您对 Create 的定义没有 return,因此无法停止。如果您将可观察到的源更改为基于计时器的东西,那么它就可以正常工作。试试这个代码:

public IEnumerable<long> GetObjects()
{
    return Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Finally(() => Console.WriteLine("Done."))
        .ToEnumerable();
}

public void Test()
{
    foreach (long i in GetObjects())
    {
        Console.WriteLine(i);
        if (i == 10)
        {
            break;
        }
    }
}

当你运行你得到这个输出:

0
1
2
3
4
5
6
7
8
9
10
Done.

很明显是在源 Observable 上调用 OnCompleted