C# Rx 如何在创建的 Observable 中正确处理源 Enumerable

C# Rx How to properly dispose of source Enumerable in created Observable

我想将 IEnumerable、IDisposable(源)改编成 Observable,并想知道执行此操作的最佳方法,并希望在取消订阅时调用 source.Dispose 方法。

在introtorx.com上有一个example适配IEnumerable,但是它明确指出它有很多缺点,例如处理模式不正确,并发模型差,没有错误处理等。 .并且内置版本可以处理这些。但是内置版本似乎不会在取消订阅时在源 IEnumerable 上调用 Dispose。

理想情况下,我想使用 .Publish().RefCount() 模式在同一个源上拥有多个订阅者,并且只有当他们都取消订阅时才调用源 Dispose()

这是我尝试的代码,但它不起作用。

static void FromEnumerableTest() {
    var observable = Observable.Create<int>(
        observer => {
            var source = new JunkEnumerable();
            foreach (int i in source) {
                observer.OnNext(i);
            }
            return () => {
                source.Dispose();
            };
        })
        .SubscribeOn(Scheduler.Default)
        .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
        .Publish()
        .RefCount();

    //var observable = Observable.ToObservable(new JunkEnumerable())
    //    .SubscribeOn(Scheduler.Default)
    //    .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
    //    .Publish()
    //    .RefCount();

    Console.WriteLine("Press any key to subscribe");
    Console.ReadKey();

    var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));
    Console.WriteLine("Press any key to unsubscribe");
    Console.ReadKey();
    subscription.Dispose();

    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}


class JunkEnumerable : IEnumerable<int>, IDisposable {
    public void Dispose() { Console.WriteLine("JunkEnumerable.Dispose invoked"); }

    public IEnumerator<int> GetEnumerator() { return new Enumerator(); }

    IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }

    class Enumerator : IEnumerator<int> {
        private int counter = 0;
        public int Current {
            get {
                Thread.Sleep(1000);
                return counter++;
            }
        }

        object IEnumerator.Current { get { return Current; } }

        public void Dispose() { Console.WriteLine("JunkEnumerable.Enumerator.Dispose invoked"); }

        public bool MoveNext() { return true; }

        public void Reset() { counter = 0; }
    }
}

Rx 订阅生命周期分为三个阶段:

  1. 订阅
  2. 观察
  3. 退订

如果订阅从未完成,则不会出现取消订阅代码。毕竟,如果您从未完全订阅过,为什么还需要退订呢?你的示例代码在订阅代码中有一个无限循环,所以它永远不会完成,所以取消订阅代码永远不会发生。

处理 IDisposable 的正常方法是使用 Observable.Using。处理 IEnumerable 的正常方法是使用 .ToObservable。如果您尝试将异步引入同步的可枚举代码(如您的示例),您可以按如下方式进行:

var observable = Observable.Using(() => new JunkEnumerable(), junk => 
    Observable.Generate(junk.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(20))
);

只要TimeSpan大于15millis,Rx就会转async,完成订阅。后续数值为观察阶段的一部分,退订将全部进行。

这是一个 运行 指定调度程序枚举的运算符。 我们安排可枚举的每个枚举,以便一次性可以正确 return.

    public static IObservable<T> ToObservableOn<T>(this IEnumerable<T> source, IScheduler scheduler = default(IScheduler))
    {
        scheduler = scheduler ?? Scheduler.Default;
        return Observable.Create<T>(
            (observer) =>
            {
                var disposed = new BooleanDisposable();
                var enumerator = source.GetEnumerator();

                Action scheduleNext = default(Action);
                scheduleNext = () =>
                {
                    if (disposed.IsDisposed)
                        return;

                    if (!enumerator.MoveNext())
                    {
                        observer.OnCompleted();
                        return;
                    }

                    observer.OnNext(enumerator.Current);

                    scheduler.Schedule(scheduleNext);
                };

                scheduler.Schedule(scheduleNext);
                return StableCompositeDisposable.Create(disposed, enumerator);
            });
    }

根据您的示例,我们只需将 SubscribeOn 更改为:

        var observable = 
            new JunkEnumerable()
            .ToObservableOn(Scheduler.Default)                
            .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
            .Publish()
            .RefCount();