我如何从过程代码中获取 IObservable?
How do I spoon feed an IObservable from procedural code?
大多数关于 Reactive Extensions 的示例代码都围绕着如何在序列上编写逻辑和运算符。
围绕 Observable 生成的部分主要围绕 "FromEventPatter"、"FromAsynch" 等
IObservable<string> observableHotStatus = ??;
foreach (var task in todo)
{
//Process task;
//Post status message into observable; How do I do this?
}
简而言之,我想要一个我可以 post 进入的对象,例如 ActionBlock、Action (of T) 或类似的东西。
实现此目标的最简单方法是什么?
编辑:
更仔细地检查您的代码,我建议使用 Observable.Create
。即使它只有 returns 一个 cold observable,您可以将 Publish
运算符应用于生成的 observable 以使其成为 hot.
如果通过 task
你实际上指的是 Task<T>
,那么你可以使用 Observable.Create
的重载,它允许你定义一个异步迭代器。例如:
IObservable<string> statuses = Observable.Create<string>(
(observer, cancel) =>
{
foreach (var task in todo)
{
cancel.ThrowIfCancellationRequested();
await task;
observer.OnNext("Status");
}
});
上一个答案:
您可以使用以下类型之一,但我建议您在做出决定之前先阅读 To Use Subject or Not To Use Subject。
Subject<T>
:通用,"event"-like,hot observable。调用 OnNext
就像引发一个经典的 .NET 事件。
BehaviorSubject<T>
:通常用作 属性 的支持字段,它表示可观察到的变化序列 "events"。每当观察者订阅时,它都会立即收到当前值,然后是 属性 的所有更改。您可以随时从 Value
属性 中提取当前值;例如,在您的 属性 的 getter 中。在您的 属性 的 setter 中调用 OnNext
,您不必保留重复的支持字段。如果我对 FRP 的理解是正确的,它也是 Rx 版本的 continuous function and it's the only FRP-like 你会在 Rx 中找到的东西。
ReplaySubject<T>
:一般用作"events"的历史缓冲区,它表示一个可观察的值序列,以观察者已经错过的值开始,每当观察者订阅时。您可以控制将值缓冲多长时间;这就像在价值观的历史上滑动window。您很少需要使用这种类型。在大多数情况下,Observable.Replay
运算符就可以了。
AsyncSubject<T>
:一般用于捕获hot的结果,异步函数如Task<T>
。您很少需要使用这种类型。在大多数情况下,Observable.FromAsyncPattern
或 Task
-转换运算符就可以了。
大多数关于 Reactive Extensions 的示例代码都围绕着如何在序列上编写逻辑和运算符。
围绕 Observable 生成的部分主要围绕 "FromEventPatter"、"FromAsynch" 等
IObservable<string> observableHotStatus = ??;
foreach (var task in todo)
{
//Process task;
//Post status message into observable; How do I do this?
}
简而言之,我想要一个我可以 post 进入的对象,例如 ActionBlock、Action (of T) 或类似的东西。 实现此目标的最简单方法是什么?
编辑:
更仔细地检查您的代码,我建议使用 Observable.Create
。即使它只有 returns 一个 cold observable,您可以将 Publish
运算符应用于生成的 observable 以使其成为 hot.
如果通过 task
你实际上指的是 Task<T>
,那么你可以使用 Observable.Create
的重载,它允许你定义一个异步迭代器。例如:
IObservable<string> statuses = Observable.Create<string>(
(observer, cancel) =>
{
foreach (var task in todo)
{
cancel.ThrowIfCancellationRequested();
await task;
observer.OnNext("Status");
}
});
上一个答案:
您可以使用以下类型之一,但我建议您在做出决定之前先阅读 To Use Subject or Not To Use Subject。
Subject<T>
:通用,"event"-like,hot observable。调用OnNext
就像引发一个经典的 .NET 事件。BehaviorSubject<T>
:通常用作 属性 的支持字段,它表示可观察到的变化序列 "events"。每当观察者订阅时,它都会立即收到当前值,然后是 属性 的所有更改。您可以随时从Value
属性 中提取当前值;例如,在您的 属性 的 getter 中。在您的 属性 的 setter 中调用OnNext
,您不必保留重复的支持字段。如果我对 FRP 的理解是正确的,它也是 Rx 版本的 continuous function and it's the only FRP-like 你会在 Rx 中找到的东西。ReplaySubject<T>
:一般用作"events"的历史缓冲区,它表示一个可观察的值序列,以观察者已经错过的值开始,每当观察者订阅时。您可以控制将值缓冲多长时间;这就像在价值观的历史上滑动window。您很少需要使用这种类型。在大多数情况下,Observable.Replay
运算符就可以了。AsyncSubject<T>
:一般用于捕获hot的结果,异步函数如Task<T>
。您很少需要使用这种类型。在大多数情况下,Observable.FromAsyncPattern
或Task
-转换运算符就可以了。