可观察的订阅如何优雅地终止?

How is an observable subscription gracefully terminated?

我正在尝试使用 Reactive Extensions (Rx) 来处理数据流。但是,每个元素的处理可能需要一些时间。为了中断处理,我使用了 CancellationToken,它有效地停止了订阅。

当请求取消时,如何优雅地完成当前工作并正确终止而不丢失任何数据?

例子

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

observable
    .Subscribe(
        value =>
            {
                Console.WriteLine(value);
                Thread.Sleep(500); // Simulate processing
                
                if (cts.Token.IsCancellationRequested)
                {
                    Console.WriteLine("Cancellation detected on {0}.", value);
                    Thread.Sleep(500); // Simulate some time consuming shutdown
                    Console.WriteLine("Cleaning up done for {0}.", value);
                }
            },
        () => Console.WriteLine("Completed"),
        cts.Token);
        
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");

输出

0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.

从输出中可以看出,“Job terminated”这行不是最后一行,这意味着清理在应用程序终止之前没有足够的时间完成。

预期输出

0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.

“作业终止”行是要打印的最后一行。 “取消”和“清洁”行已被允许慢慢来。

(编辑:添加了预期输出)

如果我理解正确,这不是 Rx 问题,而是 'Whatever you are you doing in the Subscribe' 问题。您的订阅操作需要半秒,清理可能需要另外半秒,而您的作业终止需要几微秒。您希望在取消和终止之间挤出什么?

我能给你的最好建议是让订阅操作比 Thread.Sleep 调用更好地尊重取消令牌。

使用 together with the answer to a question about waiting before terminating 的答案,我想出了一个满足我要求的解决方案。

我原来的问题是我找不到等待订阅线程的方法。上面链接的答案让我以三种方式重构代码:

  1. 我将取消逻辑从订阅移到了可观察对象中。

  2. 订阅被包装在它自己的 Task 中(所以执行可以继续到 ReadLine-statement)。

  3. 引入了ManualResetEvent来控制应用程序退出策略。

解决方案:

var reset = new ManualResetEvent(false);

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250))
    .TakeWhile(x => !cts.Token.IsCancellationRequested)
    .Finally(
        () =>
            {
                Console.WriteLine("Finally: Beginning finalization.");
                Thread.Sleep(500);
                Console.WriteLine("Finally: Done with finalization.");
                reset.Set();
            });

await Task.Factory.StartNew(
    () => observable
        .Subscribe(
            value =>
                {
                    Console.WriteLine("Begin: {0}", value);
                    Thread.Sleep(2000);
                    Console.WriteLine("End: {0}", value);
                },
            () => Console.WriteLine("Completed: Subscription completed.")),
    TaskCreationOptions.LongRunning);

Console.ReadLine();
cts.Cancel();
reset.WaitOne();
Console.WriteLine("Job terminated.");

输出:

Begin: 0
End: 0
Begin: 1
Token cancelled.
End: 1
Completed: Subscription completed.
Finally: Beginning finalization.
Finally: Done with finalization.
Job terminated.

作为 Reactive Extensions 的新手,我不知道这是否是解决我问题的最佳方法。但这对问题中发布的示例来说是一个很大的改进,因为它满足了我的要求:

  • 允许每个 OnNext 动作 运行 完成。
  • 应用程序等待流处理完成(由 ManualResetEvent 发出信号)。
  • TakeWhile 方法中,流取消逻辑被移动到生产者(而不是消费者)。
  • 应用程序终止逻辑是对生产者 Finally 方法中流取消的反应。

这是一个更好的解决方案。

Observables 是 (a)waitable。可观察对象的订阅是不可等待的。所以如果你想等待你的订阅代码完成,而不是诉诸人工解决方案,比如使用 ManualResetEvents,你应该让你的订阅代码成为派生可观察对象的副作用,并且(a)等待那个可观察对象。您问题中提供的示例有额外的要求,这使事情变得有点复杂,但没有那么多:

  1. 您想在订阅可观察对象和等待它完成之间做其他事情(Console.ReadLine() 等)。

  2. 您想在取消 CancellationToken 时终止可观察对象。

下面是如何满足这些要求的示例。它仅显示解决此问题的众多可用方法之一:

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

var withCancellation = observable
    .TakeUntil(Observable.Create<Unit>(observer =>
        cts.Token.Register(() => observer.OnNext(default))));

var withSideEffectsAndCancellation = withCancellation
    .Do(value =>
    {
        Console.WriteLine(value);
        Thread.Sleep(500);

        if (cts.Token.IsCancellationRequested)
        {
            Console.WriteLine("Cancellation detected on {0}.", value);
            Thread.Sleep(500);
            Console.WriteLine("Cleaning up done for {0}.", value);
        }
    }, () => Console.WriteLine("Completed"));

var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
    .Publish()
    .AutoConnect(0);

Console.ReadLine();
cts.Cancel();

hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");

解释:

  1. .TakeUntil...cts.Token.Register... 是一种惯用的方式,可以在取消 cts.Token 时立即取消订阅 Interval 可观察对象。它是从 relevant question 复制粘贴的。您也可以使用更简单的 .TakeWhile(x => !cts.Token.IsCancellationRequested),前提是您可以接受响应性稍差的取消。

  2. Do 运算符是执行订阅副作用的自然方式,因为它与 Subscribe 方法具有相同的参数。

  3. .Publish().AutoConnect(0); 立即使序列变热。 AutoConnect 运算符没有提供与底层可观察对象断开连接的方法(与 RefCount 运算符相反),但在这种特殊情况下不需要断开连接功能。底层可观察对象的生命周期已经由我们之前附加的 CancellationToken 控制。

  4. .Wait() 之前的 .DefaultIfEmpty() 是为了防止在生成任何元素之前取消序列的边缘情况下的 InvalidOperationException 。如果您 await 异步序列也是必需的。这些等待 observable 的机制(以及 RunAsyncToTask 运算符等其他机制)正在返回 observable 发出的最后一个值,当不存在这样的值时,它们会感到沮丧。