Rx .NET 中的调度
Scheduling in Rx .NET
预期全部在 .NET Core 2.0 控制台应用程序的主线程上执行,因此输出被阻塞 10 秒:
static void Main(string[] args)
{
WriteLine($"We are on {Thread.CurrentThread.ManagedThreadId}");
var subject = new Subject<long>();
var subscription = subject.Subscribe(
i => WriteLine($"tick on {Thread.CurrentThread.ManagedThreadId}"));
var timer = Observable.Interval(TimeSpan.FromSeconds(1))
.SubscribeOn(Scheduler.CurrentThread)
.Subscribe(i => subject.OnNext(i));
Thread.Sleep(10000);
}
不过情况并非如此 – 每隔一秒就会有一条新线被随机线程调度:
We are on 1
tick on 4
tick on 5
tick on 4
tick on 4
tick on 4
tick on 4
tick on 4
tick on 4
tick on 5
我做错了什么?
Scheduler.CurrentThread
/ CurrentThreadScheduler
将在调用 schedule 的同一线程上对项目进行排队,这将是计时器恰好在 运行 上的线程。调用 Scheduler.CurrentThread
不会将通过它安排的项目的执行固定到您调用 Scheduler.CurrentThread
的线程,而是固定到调用 .Schedule()
.
的线程
此外,您调用 SubscribeOn()
只会影响将要进行 .Subscribe()
调用的线程。如果你想控制项目处理的执行,你宁愿调用 .ObserveOn()
.
如果您希望主线程上的所有内容都运行,我建议运行在主线程上设置计时器,方法是在可观察的间隔上指定一个调度程序:
Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.CurrentThread)
预期全部在 .NET Core 2.0 控制台应用程序的主线程上执行,因此输出被阻塞 10 秒:
static void Main(string[] args)
{
WriteLine($"We are on {Thread.CurrentThread.ManagedThreadId}");
var subject = new Subject<long>();
var subscription = subject.Subscribe(
i => WriteLine($"tick on {Thread.CurrentThread.ManagedThreadId}"));
var timer = Observable.Interval(TimeSpan.FromSeconds(1))
.SubscribeOn(Scheduler.CurrentThread)
.Subscribe(i => subject.OnNext(i));
Thread.Sleep(10000);
}
不过情况并非如此 – 每隔一秒就会有一条新线被随机线程调度:
We are on 1 tick on 4 tick on 5 tick on 4 tick on 4 tick on 4 tick on 4 tick on 4 tick on 4 tick on 5
我做错了什么?
Scheduler.CurrentThread
/ CurrentThreadScheduler
将在调用 schedule 的同一线程上对项目进行排队,这将是计时器恰好在 运行 上的线程。调用 Scheduler.CurrentThread
不会将通过它安排的项目的执行固定到您调用 Scheduler.CurrentThread
的线程,而是固定到调用 .Schedule()
.
此外,您调用 SubscribeOn()
只会影响将要进行 .Subscribe()
调用的线程。如果你想控制项目处理的执行,你宁愿调用 .ObserveOn()
.
如果您希望主线程上的所有内容都运行,我建议运行在主线程上设置计时器,方法是在可观察的间隔上指定一个调度程序:
Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.CurrentThread)