Rx .NET 中的调度

Scheduling in Rx .NET

预期全部在 .NET Core 2.0 控制台应用程序的主线程上执行,因此输出被阻塞 10 秒:

    static void Main(string[] args)
    {
        WriteLine($"We are on {Thread.CurrentThread.ManagedThreadId}");

        var subject = new Subject<long>();
        var subscription = subject.Subscribe(
            i => WriteLine($"tick on {Thread.CurrentThread.ManagedThreadId}"));

        var timer = Observable.Interval(TimeSpan.FromSeconds(1))
            .SubscribeOn(Scheduler.CurrentThread)
            .Subscribe(i => subject.OnNext(i));

        Thread.Sleep(10000);
    }

不过情况并非如此 – 每隔一秒就会有一条新线被随机线程调度:

We are on 1
tick on 4
tick on 5
tick on 4
tick on 4
tick on 4
tick on 4
tick on 4
tick on 4
tick on 5

我做错了什么?

Scheduler.CurrentThread / CurrentThreadScheduler 将在调用 schedule 的同一线程上对项目进行排队,这将是计时器恰好在 运行 上的线程。调用 Scheduler.CurrentThread 不会将通过它安排的项目的执行固定到您调用 Scheduler.CurrentThread 的线程,而是固定到调用 .Schedule().

的线程

此外,您调用 SubscribeOn() 只会影响将要进行 .Subscribe() 调用的线程。如果你想控制项目处理的执行,你宁愿调用 .ObserveOn().

如果您希望主线程上的所有内容都运行,我建议运行在主线程上设置计时器,方法是在可观察的间隔上指定一个调度程序:

Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.CurrentThread)