为什么代码会产生 ObjectDisposedException

Why the code produce ObjectDisposedException

static void Main(string[] args)
{
    Observable.Using(() => new EventLoopScheduler(), els => Observable
            .Defer(() => Observable.Return(1))
            .SubscribeOn(els))
        .Subscribe();
    Console.ReadLine();
}

为什么上面的代码会产生ObjectDisposedException? SubscribeOn 究竟是如何工作的?


更多详情:

//can't compile
class DataService : ObservableBase<Unit>
{
    protected override IDisposable SubscribeCore(IObserver<Unit> o) 
    {
        return Observable.Defer(() => Observable.Start(() => _httpClient.Get(...)))
            .RepeatWithDelay(TimeSpan.FromSeconds(1))
            .ObserveOn(SchedulerEx.Current)//observe back to event loop
            .Do(...)
            .Select(_ => Unit.Default)
            .Subscribe(o)
    }
}
class Controller 
{
    void Start()
    {
        _instance = Observable.Using(
            () => SchedulerEx.Create(), 
            els => _dataService.SubscribeOn(els));
    }
    void Stop()
    {
        _instance.Dispose();
    }
}
class SchedulerEx 
{
    [ThreadStatic]
    public static EventLoopScheduler Current;
    public EventLoopScheduler Create() 
    {
        var els = new EventLoopScheduler();
        els.Schedule(() => SchedulerEx.Current = els);
        return els;
    } 
}
static void Main() 
{
    var controller = kernel.Get<Controller>();
    controller.Start();
    controller.Stop();//throw if stop immediately
}

我想实现的是:在windows服务应用中,我有多个独立的DataService运行各自的事件循环,我可以在任何地方观察回当前事件循环。

非常确定,因为 .Using()。在内部,它(很可能)继续进行 using () {} 构造,它隐式调用 .Dispose() 自身的末尾。

这真是椒盐卷饼:

  • Observable.Using 类似于 C# using 语句:它在可观察对象结束后处理 IDisposable
  • Observable.Defer 延迟 运行 包含的代码,直到订阅者订阅。
  • SubscribeOn 根据传入的 IScheduler.
  • 指示应在哪个线程上订阅 observable

我测试了下面的,也炸了:

static void Main(string[] args)
{
    Observable.Using(() => new EventLoopScheduler(), els => 
        Observable.Empty<int>()
            .SubscribeOn(els)
    )
        .Subscribe();
}

... 所以 Defer 没有影响。

很明显,Using 正在使用调度程序之前对其进行处理。如果您将 Observable.Empty(或 Observable.Return)更改为不会立即完成的内容(例如 Observable.NeverObservable.Interval(TimeSpan.FromMilliseconds(100)).Take(2),则它不会爆炸。

它看起来像 EventLoopScheduler 中的竞争条件错误:可能 Observable.UsingEventLoopScheduler 都试图调用 Dispose,如果错误的到达那里先炸了

我建议删除 Using

Observable.Using(() => new EventLoopScheduler(), 
            els => new AnonymousObservable<long>(o => els.Schedule(Unit.Default, (_, s) => source.Subscribe(o))));

最后我通过上面的代码解决了问题。

试试这个:

var els = new EventLoopScheduler();
var subscription =
    Observable
        .Return(1)
        .Finally(() => els.Schedule(() => els.Dispose()))
        .ObserveOn(els)
        .Subscribe();

Console.ReadLine();