可观察的订阅如何优雅地终止?
How is an observable subscription gracefully terminated?
我正在尝试使用 Reactive Extensions (Rx) 来处理数据流。但是,每个元素的处理可能需要一些时间。为了中断处理,我使用了 CancellationToken
,它有效地停止了订阅。
当请求取消时,如何优雅地完成当前工作并正确终止而不丢失任何数据?
例子
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
observable
.Subscribe(
value =>
{
Console.WriteLine(value);
Thread.Sleep(500); // Simulate processing
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500); // Simulate some time consuming shutdown
Console.WriteLine("Cleaning up done for {0}.", value);
}
},
() => Console.WriteLine("Completed"),
cts.Token);
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");
输出
0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.
从输出中可以看出,“Job terminated”这行不是最后一行,这意味着清理在应用程序终止之前没有足够的时间完成。
预期输出
0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.
“作业终止”行是要打印的最后一行。 “取消”和“清洁”行已被允许慢慢来。
(编辑:添加了预期输出)
如果我理解正确,这不是 Rx 问题,而是 'Whatever you are you doing in the Subscribe' 问题。您的订阅操作需要半秒,清理可能需要另外半秒,而您的作业终止需要几微秒。您希望在取消和终止之间挤出什么?
我能给你的最好建议是让订阅操作比 Thread.Sleep
调用更好地尊重取消令牌。
使用 together with the answer to a question about waiting before terminating 的答案,我想出了一个满足我要求的解决方案。
我原来的问题是我找不到等待订阅线程的方法。上面链接的答案让我以三种方式重构代码:
我将取消逻辑从订阅移到了可观察对象中。
订阅被包装在它自己的 Task
中(所以执行可以继续到 ReadLine
-statement)。
引入了ManualResetEvent
来控制应用程序退出策略。
解决方案:
var reset = new ManualResetEvent(false);
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250))
.TakeWhile(x => !cts.Token.IsCancellationRequested)
.Finally(
() =>
{
Console.WriteLine("Finally: Beginning finalization.");
Thread.Sleep(500);
Console.WriteLine("Finally: Done with finalization.");
reset.Set();
});
await Task.Factory.StartNew(
() => observable
.Subscribe(
value =>
{
Console.WriteLine("Begin: {0}", value);
Thread.Sleep(2000);
Console.WriteLine("End: {0}", value);
},
() => Console.WriteLine("Completed: Subscription completed.")),
TaskCreationOptions.LongRunning);
Console.ReadLine();
cts.Cancel();
reset.WaitOne();
Console.WriteLine("Job terminated.");
输出:
Begin: 0
End: 0
Begin: 1
Token cancelled.
End: 1
Completed: Subscription completed.
Finally: Beginning finalization.
Finally: Done with finalization.
Job terminated.
作为 Reactive Extensions 的新手,我不知道这是否是解决我问题的最佳方法。但这对问题中发布的示例来说是一个很大的改进,因为它满足了我的要求:
- 允许每个 OnNext 动作 运行 完成。
- 应用程序等待流处理完成(由
ManualResetEvent
发出信号)。
- 在
TakeWhile
方法中,流取消逻辑被移动到生产者(而不是消费者)。
- 应用程序终止逻辑是对生产者
Finally
方法中流取消的反应。
这是一个更好的解决方案。
Observables 是 (a)waitable。可观察对象的订阅是不可等待的。所以如果你想等待你的订阅代码完成,而不是诉诸人工解决方案,比如使用 ManualResetEvent
s,你应该让你的订阅代码成为派生可观察对象的副作用,并且(a)等待那个可观察对象。您问题中提供的示例有额外的要求,这使事情变得有点复杂,但没有那么多:
您想在订阅可观察对象和等待它完成之间做其他事情(Console.ReadLine()
等)。
您想在取消 CancellationToken
时终止可观察对象。
下面是如何满足这些要求的示例。它仅显示解决此问题的众多可用方法之一:
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
var withCancellation = observable
.TakeUntil(Observable.Create<Unit>(observer =>
cts.Token.Register(() => observer.OnNext(default))));
var withSideEffectsAndCancellation = withCancellation
.Do(value =>
{
Console.WriteLine(value);
Thread.Sleep(500);
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500);
Console.WriteLine("Cleaning up done for {0}.", value);
}
}, () => Console.WriteLine("Completed"));
var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
.Publish()
.AutoConnect(0);
Console.ReadLine();
cts.Cancel();
hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");
解释:
.TakeUntil...cts.Token.Register...
是一种惯用的方式,可以在取消 cts.Token
时立即取消订阅 Interval
可观察对象。它是从 relevant question 复制粘贴的。您也可以使用更简单的 .TakeWhile(x => !cts.Token.IsCancellationRequested)
,前提是您可以接受响应性稍差的取消。
Do
运算符是执行订阅副作用的自然方式,因为它与 Subscribe
方法具有相同的参数。
.Publish().AutoConnect(0);
立即使序列变热。 AutoConnect
运算符没有提供与底层可观察对象断开连接的方法(与 RefCount
运算符相反),但在这种特殊情况下不需要断开连接功能。底层可观察对象的生命周期已经由我们之前附加的 CancellationToken
控制。
.Wait()
之前的 .DefaultIfEmpty()
是为了防止在生成任何元素之前取消序列的边缘情况下的 InvalidOperationException
。如果您 await
异步序列也是必需的。这些等待 observable 的机制(以及 RunAsync
和 ToTask
运算符等其他机制)正在返回 observable 发出的最后一个值,当不存在这样的值时,它们会感到沮丧。
我正在尝试使用 Reactive Extensions (Rx) 来处理数据流。但是,每个元素的处理可能需要一些时间。为了中断处理,我使用了 CancellationToken
,它有效地停止了订阅。
当请求取消时,如何优雅地完成当前工作并正确终止而不丢失任何数据?
例子
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
observable
.Subscribe(
value =>
{
Console.WriteLine(value);
Thread.Sleep(500); // Simulate processing
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500); // Simulate some time consuming shutdown
Console.WriteLine("Cleaning up done for {0}.", value);
}
},
() => Console.WriteLine("Completed"),
cts.Token);
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");
输出
0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.
从输出中可以看出,“Job terminated”这行不是最后一行,这意味着清理在应用程序终止之前没有足够的时间完成。
预期输出
0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.
“作业终止”行是要打印的最后一行。 “取消”和“清洁”行已被允许慢慢来。
(编辑:添加了预期输出)
如果我理解正确,这不是 Rx 问题,而是 'Whatever you are you doing in the Subscribe' 问题。您的订阅操作需要半秒,清理可能需要另外半秒,而您的作业终止需要几微秒。您希望在取消和终止之间挤出什么?
我能给你的最好建议是让订阅操作比 Thread.Sleep
调用更好地尊重取消令牌。
使用
我原来的问题是我找不到等待订阅线程的方法。上面链接的答案让我以三种方式重构代码:
我将取消逻辑从订阅移到了可观察对象中。
订阅被包装在它自己的
Task
中(所以执行可以继续到ReadLine
-statement)。引入了
ManualResetEvent
来控制应用程序退出策略。
解决方案:
var reset = new ManualResetEvent(false);
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250))
.TakeWhile(x => !cts.Token.IsCancellationRequested)
.Finally(
() =>
{
Console.WriteLine("Finally: Beginning finalization.");
Thread.Sleep(500);
Console.WriteLine("Finally: Done with finalization.");
reset.Set();
});
await Task.Factory.StartNew(
() => observable
.Subscribe(
value =>
{
Console.WriteLine("Begin: {0}", value);
Thread.Sleep(2000);
Console.WriteLine("End: {0}", value);
},
() => Console.WriteLine("Completed: Subscription completed.")),
TaskCreationOptions.LongRunning);
Console.ReadLine();
cts.Cancel();
reset.WaitOne();
Console.WriteLine("Job terminated.");
输出:
Begin: 0
End: 0
Begin: 1
Token cancelled.
End: 1
Completed: Subscription completed.
Finally: Beginning finalization.
Finally: Done with finalization.
Job terminated.
作为 Reactive Extensions 的新手,我不知道这是否是解决我问题的最佳方法。但这对问题中发布的示例来说是一个很大的改进,因为它满足了我的要求:
- 允许每个 OnNext 动作 运行 完成。
- 应用程序等待流处理完成(由
ManualResetEvent
发出信号)。 - 在
TakeWhile
方法中,流取消逻辑被移动到生产者(而不是消费者)。 - 应用程序终止逻辑是对生产者
Finally
方法中流取消的反应。
这是一个更好的解决方案。
Observables 是 (a)waitable。可观察对象的订阅是不可等待的。所以如果你想等待你的订阅代码完成,而不是诉诸人工解决方案,比如使用 ManualResetEvent
s,你应该让你的订阅代码成为派生可观察对象的副作用,并且(a)等待那个可观察对象。您问题中提供的示例有额外的要求,这使事情变得有点复杂,但没有那么多:
您想在订阅可观察对象和等待它完成之间做其他事情(
Console.ReadLine()
等)。您想在取消
CancellationToken
时终止可观察对象。
下面是如何满足这些要求的示例。它仅显示解决此问题的众多可用方法之一:
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
var withCancellation = observable
.TakeUntil(Observable.Create<Unit>(observer =>
cts.Token.Register(() => observer.OnNext(default))));
var withSideEffectsAndCancellation = withCancellation
.Do(value =>
{
Console.WriteLine(value);
Thread.Sleep(500);
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500);
Console.WriteLine("Cleaning up done for {0}.", value);
}
}, () => Console.WriteLine("Completed"));
var hotWithSideEffectsAndCancellation = withSideEffectsAndCancellation
.Publish()
.AutoConnect(0);
Console.ReadLine();
cts.Cancel();
hotWithSideEffectsAndCancellation.DefaultIfEmpty().Wait();
// or await hotWithSideEffectsAndCancellation.DefaultIfEmpty();
Console.WriteLine("Job terminated.");
解释:
.TakeUntil...cts.Token.Register...
是一种惯用的方式,可以在取消cts.Token
时立即取消订阅Interval
可观察对象。它是从 relevant question 复制粘贴的。您也可以使用更简单的.TakeWhile(x => !cts.Token.IsCancellationRequested)
,前提是您可以接受响应性稍差的取消。Do
运算符是执行订阅副作用的自然方式,因为它与Subscribe
方法具有相同的参数。.Publish().AutoConnect(0);
立即使序列变热。AutoConnect
运算符没有提供与底层可观察对象断开连接的方法(与RefCount
运算符相反),但在这种特殊情况下不需要断开连接功能。底层可观察对象的生命周期已经由我们之前附加的CancellationToken
控制。.Wait()
之前的.DefaultIfEmpty()
是为了防止在生成任何元素之前取消序列的边缘情况下的InvalidOperationException
。如果您await
异步序列也是必需的。这些等待 observable 的机制(以及RunAsync
和ToTask
运算符等其他机制)正在返回 observable 发出的最后一个值,当不存在这样的值时,它们会感到沮丧。