EventLoopScheduler:Dispose ObjectDisposedException 上的意外行为

EventLoopScheduler: unexpected behavior on Dispose ObjectDisposedException

当在 EventLoopScheduler 上调用 Dispose(其工作队列中至少有一个项目)时,它将抛出 ObjectDisposedException。异常从其工作线程中抛出。

我已经看到并阅读了两个已经存在的问题:

但是,我认为有些答案不太正确,引用 Intro to Rx regarding EventLoopScheduler:

The EventLoopScheduler implements IDisposable, and calling Dispose will allow the thread to terminate. As with any implementation of IDisposable, it is appropriate that you explicitly manage the lifetime of the resources you create.

来源:http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#EventLoopScheduler

他们提供了有关如何正确使用 EventLoopScheduler 的示例:

Observable
    .Using(()=>new EventLoopScheduler(), els=> GetPrices(els))
    .Subscribe(...)

不幸的是这个例子不起作用(至少不适合我:-)。给定这段代码:

internal class Program
{
    private static void Main(string[] args)
    {
        var source = new Subject<string>();
        var subscription = Observable.Using(
            () => new EventLoopScheduler(),
            scheduler => source
                .ObserveOn(scheduler)
                .Do(LongRunningAction))
            .Subscribe();

        source.OnNext("First action (2 seconds)");
        Thread.Sleep(TimeSpan.FromSeconds(1));
        subscription.Dispose(); // Scheduler is still busy!
        Console.ReadLine(); 
    }

    private static void LongRunningAction(string text) {
        Thread.Sleep(TimeSpan.FromSeconds(2));
        Console.WriteLine(text);
    }
}

我希望在 2 秒后看到一条没有任何错误的短信(即使订阅已在 1 秒后处理)。 EventLoopScheduler 无法取消正在进行的操作,这对我来说没问题。

您实际得到的是消息和未处理的 ObjectDisposedException

那么,这是一个错误还是我做错了? :-)

为了解决这个异常,我目前包装 EventLoopScheduler 并在 wrapper.Dispose() 上调用 scheduler.Schedule(() => scheduler.Dispose())

因此,被断章取义,我不得不做出回应。 :) 让我们将引用扩展到重要的部分:

You have no business disposing that EventLoopScheduler! Once you have passed it to other Rx Operators, you have passed on the responsibility for it.

问题是,您正试图让 观察者(订阅者)清理调度程序。但是观察者将调度器传递给了 observable。如果你想处理调度程序,你必须考虑现在 "owns" 它是那个可观察的。 observable 知道:

  • 订阅时
  • 取消订阅时
  • 当它向所有订阅者发送OnComplete/OnError时

有了这些信息,就可以很好地了解何时可以处理给定的任何调度程序。 (即便如此,如果您正在尝试进行通用清理,它需要在终结器中处理调度程序,因为这是它可以保证另一个订阅者不会在没有特殊知识的情况下出现的唯一一点。)

然而,单个订阅者无法保证拥有任何此类信息 - 潜在其他订阅者的知识,以及最后事件何时发送不会暴露给它。它传递给调度程序的可观察对象可以以各种时髦的方式参与其中:调用疯狂的睡眠时间长的方法;凭空捏造事件,只是因为它喜欢它;将活动推迟到下周二;通过在冰箱上贴一张便条并承诺到达那个 mañana 来响应取消订阅事件,诚实。

那么,您想每次都安全地清理那个调度程序吗?然后你需要让你的观察者去做。

虽然内置运算符不会为这些烦恼 - 我怀疑这不是一个大问题,因为在大多数用例中它只是没有必要。事实上,我认为我从未见过需要处理 EventLoopScheduler 的情况——它们一直被用于程序的生命周期。很容易认为你需要处置你看到的每个 IDisposable - 但实际上对于 Rx,这通常是没有必要的(特别是对于订阅,其中 Dispose 它实际上只是一个取消订阅的请求 - 而不是命令清理资源。当 IDisposable 制作了一个非常好的订阅句柄时,Rx 团队不想创建另一个接口。)

EventLoopScheduler 在线程不忙时挂起它的线程 - 所以大多数时候你不需要担心清理,除非你正在创建任意数量的线程(提示:你真的不需要这样做)。

如果你这样做,你可能想看看 NewThreadScheduler 是否会这样做,它实际上在幕后使用 EventLoopScheduler,在一个特殊的秘密(即内部)模式下退出如果调度程序队列为空,则线程 - 但否则重用它。是的,尽管普遍存在相反的误解,NewThreadScheduler 确实 重用线程,因此在单个订阅者的负载下不会产生很大的关联线程创建成本。只有当多个订阅者在玩时,才会出现多个线程,或者当它空闲时,下一个事件才会导致线程创建。

但是如果您使用 EventLoopScheduler,您可能会在一个地方使用它来将事物绑定到一个全局共享的事件循环(毕竟 - 这就是事件循环通常所做的 - 将事件集中到一个应用程序中的线程)——因此很少需要清理该线程,因为无论如何它都会在进程结束时消失。

我上面的评论是 James 的回答。 "answer" 在此提供 "fixes" 问题的示例代码。

不过我确实认为 EventLoopScheduler 存在错误。我不认为*如果它已经被处置,它应该继续递归地安排工作。

void Main()
{
    //In my example GetPrices is the source. 
    //  I meant that you could use an ELS to do some heavy work to get prices.
    //var source = new Subject<string>();   
    var subscription = Observable.Using(
        () => new EventLoopScheduler(),
        scheduler =>
        {
            return Observable.Create<string>((obs, ct) =>
            {
                var scheduleItem = scheduler.Schedule(0, (state,self) => {
                    //Do work to get price (network request? or Heavy CPU work?)
                    var price = state.ToString("c");
                    LongRunningAction(price);
                    obs.OnNext(price);
                    //Without this check, we see that the Scheduler will try to 
                    //  recursively call itself even when disposed.
                    if(!ct.IsCancellationRequested)
                        self(state+1);
                });
                return Task.FromResult(scheduleItem);
            });
        })
        .Subscribe();

    Thread.Sleep(TimeSpan.FromSeconds(1));
    subscription.Dispose(); // Scheduler is still busy!
    Console.ReadLine();
}

private static void LongRunningAction(string text)
{
    Thread.Sleep(TimeSpan.FromSeconds(2));
    Console.WriteLine(text);
}

*但我完全保留在我确信不是这样的情况下改变主意的权利。

FWIW:通常我只将 ELS 用作服务中的 readonly 字段,我想将线程专用于处理一些入站工作。例如我只想使用一个线程从网络或磁盘读取该服务。在本例中,我创建了一个 ELS,它将执行任何工作。然后在包含它的 class 被处置时被处置。正如 IntroToRx.com 中的示例所示,我认为我根本不会经常使用它。

好的,我有一些有用的东西。但它不是线程安全的,相关代码行已用注释标注。猜猜我应该打开一个错误票:-/

private static void Main(string[] args)
{
    var originSource = new Subject<string>();
    var subscription = UsingEventLoop(originSource)
        .Do(LongRunningAction) // runs on EventLoopScheduler thread
        .Subscribe();

    originSource.OnNext("First action (appears after 2 seconds)");
    originSource.OnNext("Second action (must not appear");

    Thread.Sleep(TimeSpan.FromSeconds(1));
    subscription.Dispose(); // Scheduler is still busy with first action!

    Console.WriteLine("Press any key to exit.");
    Console.ReadLine();
}

private static IObservable<TValue> UsingEventLoop<TValue>(IObservable<TValue> source)
{
    return Observable.Using(
        () => new EventLoopScheduler(),
        scheduler => Observable.Create<TValue>((obs, ct) =>
        {
            return Task.FromResult(source.Subscribe(value =>
            {
                // The following check+call is NOT thread safe!
                if (!ct.IsCancellationRequested) 
                {
                    scheduler.Schedule(() => obs.OnNext(value));
                }
            }));
        }));
}

private static void LongRunningAction<TValue>(TValue value) {
    Thread.Sleep(TimeSpan.FromSeconds(2));
    Console.WriteLine(value);
}