Reactive Extensions ToEnumerable 如果不迭代所有内容,如何处理可观察状态
Reactive Extensions ToEnumerable how to dispose observable state if not iterating everything
如果我在 IObservable
上使用 ToEnumerable
扩展,用户可能不会重复所有元素。那么在Observable.Create
中声明的IDisposable
如何妥善处理呢?
为了争论起见,我们假设直接 return 和 IObservable
给用户不是一个选项(在这种情况下,用户可以自己实现取消)。
private IObservable<Object> MakeObservable()
{
return Observable.Create(async (observer, cancelToken) =>
{
using(SomeDisposable somedisposable = new SomeDisposable())
{
while(true)
{
Object result = somedisposable.GetNextObject();
if(result == null)
{
break;
}
observer.OnNext(result);
}
}
}
}
public IEnumerable<Object> GetObjects()
{
return MakeObservable().ToEnumerable();
}
public void Test()
{
IEnumerable<Object> e = GetObjects();
int i = 0;
foreach(Object o in e)
{
if(i++ == 10)
break;
}
//somedisposable is not disposed here!!!
}
问题是您对 Create
的定义没有 return,因此无法停止。如果您将可观察到的源更改为基于计时器的东西,那么它就可以正常工作。试试这个代码:
public IEnumerable<long> GetObjects()
{
return Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Finally(() => Console.WriteLine("Done."))
.ToEnumerable();
}
public void Test()
{
foreach (long i in GetObjects())
{
Console.WriteLine(i);
if (i == 10)
{
break;
}
}
}
当你运行你得到这个输出:
0
1
2
3
4
5
6
7
8
9
10
Done.
很明显是在源 Observable 上调用 OnCompleted
。
如果我在 IObservable
上使用 ToEnumerable
扩展,用户可能不会重复所有元素。那么在Observable.Create
中声明的IDisposable
如何妥善处理呢?
为了争论起见,我们假设直接 return 和 IObservable
给用户不是一个选项(在这种情况下,用户可以自己实现取消)。
private IObservable<Object> MakeObservable()
{
return Observable.Create(async (observer, cancelToken) =>
{
using(SomeDisposable somedisposable = new SomeDisposable())
{
while(true)
{
Object result = somedisposable.GetNextObject();
if(result == null)
{
break;
}
observer.OnNext(result);
}
}
}
}
public IEnumerable<Object> GetObjects()
{
return MakeObservable().ToEnumerable();
}
public void Test()
{
IEnumerable<Object> e = GetObjects();
int i = 0;
foreach(Object o in e)
{
if(i++ == 10)
break;
}
//somedisposable is not disposed here!!!
}
问题是您对 Create
的定义没有 return,因此无法停止。如果您将可观察到的源更改为基于计时器的东西,那么它就可以正常工作。试试这个代码:
public IEnumerable<long> GetObjects()
{
return Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Finally(() => Console.WriteLine("Done."))
.ToEnumerable();
}
public void Test()
{
foreach (long i in GetObjects())
{
Console.WriteLine(i);
if (i == 10)
{
break;
}
}
}
当你运行你得到这个输出:
0 1 2 3 4 5 6 7 8 9 10 Done.
很明显是在源 Observable 上调用 OnCompleted
。