使 System.Reactive 重复新订阅的最后 X 项
Make System.Reactive repeat last X items on new subscribe
如何使用 System.Reactive 实现下一个逻辑?
IObservable<int>
当至少有一个订阅者存在并且当新订阅者订阅时,它会在计时器上生成新项目(整数),它会为新订阅者重复最后 X 个项目。
例子:
让 X = 5;
- SubscriberA 订阅并且 Observable 发出 1、2、3、4、5、6、7
- 订阅者 B 订阅并获得 3、4、5、6、7(最后 5 个项目),然后他们都继续收到新项目,如 8、9、10、11 等。
- 如果 SuscriberC 订阅,它将获得 7、8、9、10、11 等等。
这是我现在用于我的可观察对象的代码。我使用 catch 和 repeat to 因为 Collect()
可以抛出异常,所以订阅将继续。
IObservable<int> Responses = Observable
.Defer(() => Observable.Interval(_pollInterval, scheduler)
.SelectMany(_ => Collect(monitoringDataProvider).ToObservable())
.Catch<int, Exception>(exception => Observable.Return(-1))
.Repeat());
private int Collect(MonitoringDataProvider monitoringDataProvider)
{
//..some logic that returns int;
}
您可能正在搜索 Replay
运算符:
public static IConnectableObservable<TSource> Replay<TSource>(
this IObservable<TSource> source,
int bufferSize
)
Returns a connectable observable sequence that shares a single subscription to the underlying sequence, replaying bufferSize
notifications.
用法示例:
IObservable<int> ReplayedResponses = Responses
.Replay(bufferSize: 5)
.RefCount();
RefCount
运算符是连接到底层可观察对象的可用策略之一。其他的是 AutoConnect
运算符和手动 Connect
方法。
如何使用 System.Reactive 实现下一个逻辑?
IObservable<int>
当至少有一个订阅者存在并且当新订阅者订阅时,它会在计时器上生成新项目(整数),它会为新订阅者重复最后 X 个项目。
例子:
让 X = 5;
- SubscriberA 订阅并且 Observable 发出 1、2、3、4、5、6、7
- 订阅者 B 订阅并获得 3、4、5、6、7(最后 5 个项目),然后他们都继续收到新项目,如 8、9、10、11 等。
- 如果 SuscriberC 订阅,它将获得 7、8、9、10、11 等等。
这是我现在用于我的可观察对象的代码。我使用 catch 和 repeat to 因为 Collect()
可以抛出异常,所以订阅将继续。
IObservable<int> Responses = Observable
.Defer(() => Observable.Interval(_pollInterval, scheduler)
.SelectMany(_ => Collect(monitoringDataProvider).ToObservable())
.Catch<int, Exception>(exception => Observable.Return(-1))
.Repeat());
private int Collect(MonitoringDataProvider monitoringDataProvider)
{
//..some logic that returns int;
}
您可能正在搜索 Replay
运算符:
public static IConnectableObservable<TSource> Replay<TSource>(
this IObservable<TSource> source,
int bufferSize
)
Returns a connectable observable sequence that shares a single subscription to the underlying sequence, replaying
bufferSize
notifications.
用法示例:
IObservable<int> ReplayedResponses = Responses
.Replay(bufferSize: 5)
.RefCount();
RefCount
运算符是连接到底层可观察对象的可用策略之一。其他的是 AutoConnect
运算符和手动 Connect
方法。