Rxcpp 中的调度器

Schedulers in Rxcpp

我正在尝试找出 Rx 的 C++ version 中的调度模型。

了解 C# 版本,其中有一个带有一个 Schedule 方法的简单接口; C++ 版本看起来相当复杂,有调度程序、工作程序和协调等内容。

我缺少的一个主要部分是线程池调度程序的实现,是否存在其他名称?我将如何自己实施?写在上面PPL (Windows)? If I need a serialized (Actor like) observer above it, what should I use? Peeking here and here可以说明这不是一件小事

这真的有助于获得关于该主题的某种概述,因为 official 文档是自动生成的并且仍然非常稀少。

是的,生成的文档是新的,尚未记录调度。

rxcpp v2 中的调度器基于 RxJava 使用的调度器和工作器构造(Eric Meijer 参与其中) RxJava 的文档将对调度程序和工作程序进行解释。 rxcpp 添加 schedulable、coordination 和 coordinator。

scheduler 拥有由 now() 方法公开的时间线。 scheduler 也是该时间线中 worker 的工厂。由于调度程序拥有时间线,因此可以构建时间旅行的调度程序。虚拟调度程序是测试调度程序的基础,它使用它在毫秒内完成多秒测试。

worker 拥有时间轴的待处理 schedulable 队列,并且具有生命周期。当达到 schedulable 的时间时,schedulable 是 运行。队列维护插入顺序,以便当 N schedulable 具有相同的目标时间时,它们按插入队列的顺序排列 运行。 worker 保证每个 schedulable 在下一个 schedulable 开始之前完成。当 worker 的生命周期被取消订阅时,所有未决的 schedulable 将被丢弃。

schedulable 拥有一个函数,并且有一个 worker 和一个 lifetime。当取消订阅 schedulable 的生命周期时,将不会调用 schedulable 函数。 schedulable 被传递给函数并允许函数重新安排自己或在同一个工作人员上安排其他事情。

新概念是协调和协调员。我添加这些是为了简化运算符实现并在运算符实现中引入按使用付费。具体来说,在 Rx.NET 和 RxJava 中,运算符使用原子操作和同步原语来协调来自多个流的消息,即使所有流都在同一个线程上(如 UI 事件)。默认使用 rxcpp 中的 identity_. . . 坐标,没有开销。 syncronize_. . .observe_on_. . . 协调分别使用 mutex 和 queue-onto-a-worker 来安全地交错多个流。

coordinationcoordinator 的工厂,有 scheduler.

coordinator 有一个 worker,是协调 observables、subscribers 和 schedulable 函数的工厂。

所有采用多个流或及时处理的运算符(甚至 subscribe_on 和 observe_on)都采用协调参数,而不是调度程序。

这里有一些提供的函数,它们将使用特定的调度程序进行协调。

  • identity_immediate()
  • identity_current_thread()
  • identity_same_worker(工人w)
  • serialize_event_loop()
  • serialize_new_thread()
  • serialize_same_worker(工人w)
  • observe_on_event_loop()
  • observe_on_new_thread()

还没有线程池调度程序。线程池调度程序需要依赖于线程池实现,因为我不想编写线程池。我的计划是为 windows 线程池和 apple 线程池以及 boost asio 执行器池制作一个调度程序。要回答的一个问题是这些特定于平台的构造是否应该存在于 rxcpp 存储库中或具有平台具体回购。

欢迎贡献、意见和想法!