C# Rx 如何在创建的 Observable 中正确处理源 Enumerable
C# Rx How to properly dispose of source Enumerable in created Observable
我想将 IEnumerable、IDisposable(源)改编成 Observable,并想知道执行此操作的最佳方法,并希望在取消订阅时调用 source.Dispose 方法。
在introtorx.com上有一个example适配IEnumerable,但是它明确指出它有很多缺点,例如处理模式不正确,并发模型差,没有错误处理等。 .并且内置版本可以处理这些。但是内置版本似乎不会在取消订阅时在源 IEnumerable 上调用 Dispose。
理想情况下,我想使用 .Publish().RefCount()
模式在同一个源上拥有多个订阅者,并且只有当他们都取消订阅时才调用源 Dispose()
。
这是我尝试的代码,但它不起作用。
static void FromEnumerableTest() {
var observable = Observable.Create<int>(
observer => {
var source = new JunkEnumerable();
foreach (int i in source) {
observer.OnNext(i);
}
return () => {
source.Dispose();
};
})
.SubscribeOn(Scheduler.Default)
.Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running
.Publish()
.RefCount();
//var observable = Observable.ToObservable(new JunkEnumerable())
// .SubscribeOn(Scheduler.Default)
// .Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running
// .Publish()
// .RefCount();
Console.WriteLine("Press any key to subscribe");
Console.ReadKey();
var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
subscription.Dispose();
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
class JunkEnumerable : IEnumerable<int>, IDisposable {
public void Dispose() { Console.WriteLine("JunkEnumerable.Dispose invoked"); }
public IEnumerator<int> GetEnumerator() { return new Enumerator(); }
IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
class Enumerator : IEnumerator<int> {
private int counter = 0;
public int Current {
get {
Thread.Sleep(1000);
return counter++;
}
}
object IEnumerator.Current { get { return Current; } }
public void Dispose() { Console.WriteLine("JunkEnumerable.Enumerator.Dispose invoked"); }
public bool MoveNext() { return true; }
public void Reset() { counter = 0; }
}
}
Rx 订阅生命周期分为三个阶段:
- 订阅
- 观察
- 退订
如果订阅从未完成,则不会出现取消订阅代码。毕竟,如果您从未完全订阅过,为什么还需要退订呢?你的示例代码在订阅代码中有一个无限循环,所以它永远不会完成,所以取消订阅代码永远不会发生。
处理 IDisposable
的正常方法是使用 Observable.Using
。处理 IEnumerable
的正常方法是使用 .ToObservable
。如果您尝试将异步引入同步的可枚举代码(如您的示例),您可以按如下方式进行:
var observable = Observable.Using(() => new JunkEnumerable(), junk =>
Observable.Generate(junk.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(20))
);
只要TimeSpan大于15millis,Rx就会转async,完成订阅。后续数值为观察阶段的一部分,退订将全部进行。
这是一个 运行 指定调度程序枚举的运算符。
我们安排可枚举的每个枚举,以便一次性可以正确 return.
public static IObservable<T> ToObservableOn<T>(this IEnumerable<T> source, IScheduler scheduler = default(IScheduler))
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<T>(
(observer) =>
{
var disposed = new BooleanDisposable();
var enumerator = source.GetEnumerator();
Action scheduleNext = default(Action);
scheduleNext = () =>
{
if (disposed.IsDisposed)
return;
if (!enumerator.MoveNext())
{
observer.OnCompleted();
return;
}
observer.OnNext(enumerator.Current);
scheduler.Schedule(scheduleNext);
};
scheduler.Schedule(scheduleNext);
return StableCompositeDisposable.Create(disposed, enumerator);
});
}
根据您的示例,我们只需将 SubscribeOn
更改为:
var observable =
new JunkEnumerable()
.ToObservableOn(Scheduler.Default)
.Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running
.Publish()
.RefCount();
我想将 IEnumerable、IDisposable(源)改编成 Observable,并想知道执行此操作的最佳方法,并希望在取消订阅时调用 source.Dispose 方法。
在introtorx.com上有一个example适配IEnumerable,但是它明确指出它有很多缺点,例如处理模式不正确,并发模型差,没有错误处理等。 .并且内置版本可以处理这些。但是内置版本似乎不会在取消订阅时在源 IEnumerable 上调用 Dispose。
理想情况下,我想使用 .Publish().RefCount()
模式在同一个源上拥有多个订阅者,并且只有当他们都取消订阅时才调用源 Dispose()
。
这是我尝试的代码,但它不起作用。
static void FromEnumerableTest() {
var observable = Observable.Create<int>(
observer => {
var source = new JunkEnumerable();
foreach (int i in source) {
observer.OnNext(i);
}
return () => {
source.Dispose();
};
})
.SubscribeOn(Scheduler.Default)
.Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running
.Publish()
.RefCount();
//var observable = Observable.ToObservable(new JunkEnumerable())
// .SubscribeOn(Scheduler.Default)
// .Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running
// .Publish()
// .RefCount();
Console.WriteLine("Press any key to subscribe");
Console.ReadKey();
var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
subscription.Dispose();
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
class JunkEnumerable : IEnumerable<int>, IDisposable {
public void Dispose() { Console.WriteLine("JunkEnumerable.Dispose invoked"); }
public IEnumerator<int> GetEnumerator() { return new Enumerator(); }
IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
class Enumerator : IEnumerator<int> {
private int counter = 0;
public int Current {
get {
Thread.Sleep(1000);
return counter++;
}
}
object IEnumerator.Current { get { return Current; } }
public void Dispose() { Console.WriteLine("JunkEnumerable.Enumerator.Dispose invoked"); }
public bool MoveNext() { return true; }
public void Reset() { counter = 0; }
}
}
Rx 订阅生命周期分为三个阶段:
- 订阅
- 观察
- 退订
如果订阅从未完成,则不会出现取消订阅代码。毕竟,如果您从未完全订阅过,为什么还需要退订呢?你的示例代码在订阅代码中有一个无限循环,所以它永远不会完成,所以取消订阅代码永远不会发生。
处理 IDisposable
的正常方法是使用 Observable.Using
。处理 IEnumerable
的正常方法是使用 .ToObservable
。如果您尝试将异步引入同步的可枚举代码(如您的示例),您可以按如下方式进行:
var observable = Observable.Using(() => new JunkEnumerable(), junk =>
Observable.Generate(junk.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(20))
);
只要TimeSpan大于15millis,Rx就会转async,完成订阅。后续数值为观察阶段的一部分,退订将全部进行。
这是一个 运行 指定调度程序枚举的运算符。 我们安排可枚举的每个枚举,以便一次性可以正确 return.
public static IObservable<T> ToObservableOn<T>(this IEnumerable<T> source, IScheduler scheduler = default(IScheduler))
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<T>(
(observer) =>
{
var disposed = new BooleanDisposable();
var enumerator = source.GetEnumerator();
Action scheduleNext = default(Action);
scheduleNext = () =>
{
if (disposed.IsDisposed)
return;
if (!enumerator.MoveNext())
{
observer.OnCompleted();
return;
}
observer.OnNext(enumerator.Current);
scheduler.Schedule(scheduleNext);
};
scheduler.Schedule(scheduleNext);
return StableCompositeDisposable.Create(disposed, enumerator);
});
}
根据您的示例,我们只需将 SubscribeOn
更改为:
var observable =
new JunkEnumerable()
.ToObservableOn(Scheduler.Default)
.Do(i => Console.WriteLine("Publishing {0}", i)) // side effect to show it is running
.Publish()
.RefCount();