可观察的取消令牌
Cancellation token for observable
如果在单击开始按钮时创建了以下可观察对象,即从停止按钮创建,我如何取消以下类型的 Rx 可观察对象。
var instance = ThreadPoolScheduler.Instance;
Observable.Interval(TimeSpan.FromSeconds(2), instance)
.Subscribe(_ =>
{
Console.WriteLine(DateTime.Now); // dummy event
}
);
您保留 Subscribe
返回的 IDisposable
,并对其调用 Dispose
。
很可能有一种方法可以将基于 Rx IDisposable
的取消订阅与开箱即用的 CancellationToken
集成,但只需调用 Dispose
就可以开始了。 (您总是可以使用取消令牌注册一个延续来调用处置...)
这里有一个简短但完整的示例来演示:
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
class Program
{
static void Main(string[] args)
{
var instance = ThreadPoolScheduler.Instance;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var disposable = Observable
.Interval(TimeSpan.FromSeconds(0.5), instance)
.Subscribe(_ => Console.WriteLine(DateTime.UtcNow));
cts.Token.Register(() => disposable.Dispose());
Thread.Sleep(10000);
}
}
只需使用 Subscribe
的重载之一,它需要一个 CancellationToken
:
observable.Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cancellationToken);
这简化了 Jon Skeet 的示例:
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
class Program
{
static void Main(string[] args)
{
var instance = ThreadPoolScheduler.Instance;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
Observable.Interval(TimeSpan.FromSeconds(0.5), instance)
.Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cts.Token);
Thread.Sleep(10000);
}
}
如果在单击开始按钮时创建了以下可观察对象,即从停止按钮创建,我如何取消以下类型的 Rx 可观察对象。
var instance = ThreadPoolScheduler.Instance;
Observable.Interval(TimeSpan.FromSeconds(2), instance)
.Subscribe(_ =>
{
Console.WriteLine(DateTime.Now); // dummy event
}
);
您保留 Subscribe
返回的 IDisposable
,并对其调用 Dispose
。
很可能有一种方法可以将基于 Rx IDisposable
的取消订阅与开箱即用的 CancellationToken
集成,但只需调用 Dispose
就可以开始了。 (您总是可以使用取消令牌注册一个延续来调用处置...)
这里有一个简短但完整的示例来演示:
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
class Program
{
static void Main(string[] args)
{
var instance = ThreadPoolScheduler.Instance;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var disposable = Observable
.Interval(TimeSpan.FromSeconds(0.5), instance)
.Subscribe(_ => Console.WriteLine(DateTime.UtcNow));
cts.Token.Register(() => disposable.Dispose());
Thread.Sleep(10000);
}
}
只需使用 Subscribe
的重载之一,它需要一个 CancellationToken
:
observable.Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cancellationToken);
这简化了 Jon Skeet 的示例:
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
class Program
{
static void Main(string[] args)
{
var instance = ThreadPoolScheduler.Instance;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
Observable.Interval(TimeSpan.FromSeconds(0.5), instance)
.Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cts.Token);
Thread.Sleep(10000);
}
}