使 System.Reactive 重复新订阅的最后 X 项

Make System.Reactive repeat last X items on new subscribe

如何使用 System.Reactive 实现下一个逻辑?

IObservable<int> 当至少有一个订阅者存在并且当新订阅者订阅时,它会在计时器上生成新项目(整数),它会为新订阅者重复最后 X 个项目。 例子: 让 X = 5;

  1. SubscriberA 订阅并且 Observable 发出 1、2、3、4、5、6、7
  2. 订阅者 B 订阅并获得 3、4、5、6、7(最后 5 个项目),然后他们都继续收到新项目,如 8、9、10、11 等。
  3. 如果 SuscriberC 订阅,它将获得 7、8、9、10、11 等等。

这是我现在用于我的可观察对象的代码。我使用 catch 和 repeat to 因为 Collect() 可以抛出异常,所以订阅将继续。

IObservable<int> Responses = Observable
    .Defer(() => Observable.Interval(_pollInterval, scheduler)
    .SelectMany(_ => Collect(monitoringDataProvider).ToObservable())
    .Catch<int, Exception>(exception => Observable.Return(-1))
    .Repeat());

private int Collect(MonitoringDataProvider monitoringDataProvider)
{
    //..some logic that returns int;
}

您可能正在搜索 Replay 运算符:

public static IConnectableObservable<TSource> Replay<TSource>(
    this IObservable<TSource> source,
    int bufferSize
)

Returns a connectable observable sequence that shares a single subscription to the underlying sequence, replaying bufferSize notifications.

用法示例:

IObservable<int> ReplayedResponses = Responses
    .Replay(bufferSize: 5)
    .RefCount();

RefCount 运算符是连接到底层可观察对象的可用策略之一。其他的是 AutoConnect 运算符和手动 Connect 方法。