我如何从过程代码中获取 IObservable?

How do I spoon feed an IObservable from procedural code?

大多数关于 Reactive Extensions 的示例代码都围绕着如何在序列上编写逻辑和运算符。

围绕 Observable 生成的部分主要围绕 "FromEventPatter"、"FromAsynch" 等

IObservable<string> observableHotStatus = ??;
foreach (var task in todo)
{
   //Process task;
   //Post status message into observable; How do I do this?
}

简而言之,我想要一个我可以 post 进入的对象,例如 ActionBlock、Action (of T) 或类似的东西。 实现此目标的最简单方法是什么?

编辑:

更仔细地检查您的代码,我建议使用 Observable.Create。即使它只有 returns 一个 cold observable,您可以将 Publish 运算符应用于生成的 observable 以使其成为 hot.

如果通过 task 你实际上指的是 Task<T>,那么你可以使用 Observable.Create 的重载,它允许你定义一个异步迭代器。例如:

IObservable<string> statuses = Observable.Create<string>(
  (observer, cancel) =>
  {
    foreach (var task in todo)
    {
      cancel.ThrowIfCancellationRequested();

      await task;
      observer.OnNext("Status");
    }
  });

上一个答案:

您可以使用以下类型之一,但我建议您在做出决定之前先阅读 To Use Subject or Not To Use Subject

  • Subject<T>:通用,"event"-like,hot observable。调用 OnNext 就像引发一个经典的 .NET 事件。
  • BehaviorSubject<T>:通常用作 属性 的支持字段,它表示可观察到的变化序列 "events"。每当观察者订阅时,它都会立即收到当前值,然后是 属性 的所有更改。您可以随时从 Value 属性 中提取当前值;例如,在您的 属性 的 getter 中。在您的 属性 的 setter 中调用 OnNext,您不必保留重复的支持字段。如果我对 FRP 的理解是正确的,它也是 Rx 版本的 continuous function and it's the only FRP-like 你会在 Rx 中找到的东西。
  • ReplaySubject<T>:一般用作"events"的历史缓冲区,它表示一个可观察的值序列,以观察者已经错过的值开始,每当观察者订阅时。您可以控制将值缓冲多长时间;这就像在价值观的历史上滑动window。您很少需要使用这种类型。在大多数情况下,Observable.Replay 运算符就可以了。
  • AsyncSubject<T>:一般用于捕获hot的结果,异步函数如Task<T>。您很少需要使用这种类型。在大多数情况下,Observable.FromAsyncPatternTask-转换运算符就可以了。