反应性扩展序列化任务每个可取消

Reactive extension serialized task each cancellable

我试图用 Rx 扩展解决方案回答 another question。在制定解决方案时,我发现了一些奇怪的事情。

static Random rand = new Random();

static void Main(string[] args) {
    //var obs = Observable.Interval(TimeSpan.FromMilliseconds(250)).Do<long>(i =>
    var obs = Observable.Interval(TimeSpan.FromMilliseconds(25)).Do<long>(i =>
    {
        CancellationTokenSource source = new CancellationTokenSource(25);
        //CancellationTokenSource source = new CancellationTokenSource(250);
        ReadNext(source.Token, i);
    }).Publish();
    var disp = obs.Connect();
    Console.ReadKey();
    disp.Dispose();
    Console.ReadKey();
}

static private void ReadNext(CancellationToken token, long actual) {
    int i = rand.Next(4);
    Stopwatch watch = new Stopwatch();
    watch.Start();
    for(int j = 0; j < i; j++) {
        //Thread.Sleep(100);
        Thread.Sleep(10);
        if(token.IsCancellationRequested) {
            Console.WriteLine(string.Format("method cancelled. cycles: {0}, should be 3. Now should be last (2): {1}", i, j));
            return;
        }
    }
    Console.WriteLine(string.Format("method done in {0} cycles. Preserved index: {1}. Elapsed time: {2}", i, actual, watch.ElapsedMilliseconds));
    watch.Stop();
}

取消超时有问题。不知何故,当第三个周期发生时(我们已经等了 ~30 毫秒),ReadNext 并没有每次都被取消。

查看打印输出:

method done in 1 cycles. Preserved index: 7. Elapsed time: 9
method done in 1 cycles. Preserved index: 8. Elapsed time: 9
method done in 0 cycles. Preserved index: 9. Elapsed time: 0
method cancelled. cycles: 3, should be 3. Now should be last (2): 2
method done in 1 cycles. Preserved index: 11. Elapsed time: 9
method done in 2 cycles. Preserved index: 12. Elapsed time: 19
method done in 2 cycles. Preserved index: 13. Elapsed time: 19
method done in 0 cycles. Preserved index: 14. Elapsed time: 0
method done in 2 cycles. Preserved index: 15. Elapsed time: 19
method done in 0 cycles. Preserved index: 16. Elapsed time: 0
method done in 1 cycles. Preserved index: 17. Elapsed time: 9
method cancelled. cycles: 3, should be 3. Now should be last (2): 2
method done in 1 cycles. Preserved index: 19. Elapsed time: 9
method done in 3 cycles. Preserved index: 20. Elapsed time: 29  <- bug.
method done in 2 cycles. Preserved index: 21. Elapsed time: 19
method done in 1 cycles. Preserved index: 22. Elapsed time: 9
method done in 1 cycles. Preserved index: 23. Elapsed time: 9
method done in 2 cycles. Preserved index: 24. Elapsed time: 19
method done in 2 cycles. Preserved index: 25. Elapsed time: 19
method done in 2 cycles. Preserved index: 26. Elapsed time: 19
method done in 1 cycles. Preserved index: 27. Elapsed time: 10
method done in 1 cycles. Preserved index: 28. Elapsed time: 9
method done in 3 cycles. Preserved index: 29. Elapsed time: 29  <- bug.
method done in 1 cycles. Preserved index: 30. Elapsed time: 9

我是否必须监听其他一些调度程序,以确保在 25 毫秒后取消令牌肯定被取消,或者其他导致错误的原因?

编辑

如果我将睡眠顺序升级一个(检查上面的注释代码),它会起作用。问题是 Thread.Sleep 不够精确。

Windows不是实时的OS。一般来说,系统定时器 运行 在 60Hz,这意味着它们只能精确到 16.7ms。再加上线程的数量比 运行 它们的物理内核多得多,而且您不能指望在不知道自己在做什么的情况下编写具有精确时序的代码。

因此,无论何时编写定时器代码,都假设您启动的任何定时器都将以 +/- 16 毫秒的精度触发。