Observable.Delay 或 Observable.Buffer 重用同一个线程

Observable.Delay or Observable.Buffer reusing same thread

是否有某些版本的 Observable.Delay 或 Observable.Buffer 不为其计时器使用新线程?也许精度较低..

我有一个场景,我需要在每秒产生数千条消息的可观察对象上调用 Observable.Delay,这会创建大量线程..

谢谢。

如果你想限制线程的数量那么你只需要考虑你使用的调度器,这与创建的定时器数量无关。基于时间的操作根据到期时间安排操作,并且它是决定如何在正确时间分派事件的调度程序 - Rx 智能地知道实际创建了多少计时器,并且此机制取决于所使用的调度程序。

基于时间的运算符 (Scheduler.Default) 使用的大多数平台上的默认调度程序将使用任务池获取线程来调度在未来时间安排的事件,这就是为什么你会通常会看到不同的线程被用来分派事件。

控制线程的一种方法是使用 EventLoopScheduler 并确保在使用基于时间的操作时指定它。这将在同一线程上分派所有事件。例如:

var scheduler = new EventLoopScheduler();

Observable.Return(1)
          .Delay(TimeSpan.FromSeconds(4), scheduler)
          .Subscribe(x => Console.WriteLine(x.ToString() + ": "
              + Thread.CurrentThread.ManagedThreadId));
Observable.Return(2).Delay(TimeSpan.FromSeconds(2), scheduler)
          .Subscribe(x => Console.WriteLine(x.ToString() + ": "
              + Thread.CurrentThread.ManagedThreadId));

Console.WriteLine("Done.");

将输出如下内容(ThreadId 当然会有所不同):

Done.
2: 3
1: 3

而如果将第一行更改为 var scheduler = Scheduler.Default;,您会看到不同的线程 ID。

Rx 中的计时器主题非常复杂,对于这种格式来说可能有点过于宽泛 - this excellent post 涵盖了很多内部细节。请往下看部分 "It's all about time."