完成后的冷可观察集合

Cold observable collection with completion

所以也许我把这个观察者模式弄错了,但这就是我想要的。我有一个方法应该从数据库中获取一些整数。看起来像这样:

IObservable<int> GetInts()

每次他完成获取一个整数时,他自然应该通过 OnNext() 告诉它的订阅者。但是,如果旁观者迟到,他可能会错过其中的一些电话。我仍然想让他知道所有的信息,所以当他订阅 Observable 时应该 "fill him in" 并告诉他他错过的信息......然后在添加新东西时继续告诉他。

但是,在某个时候数据库读取将结束,方法会知道。那时他应该调用订阅者的 OnComplete()。所以基本上我想要的是一个可以告诉人们一些东西的系列。我一直没能找到做这样的事情的东西,所以我必须实施它。它可能看起来像:

class MyObservable:List<int>,IObservable<int>{
private bool completed;
private List<IObserver<int>> observers = new List<IObserver<int>>();

public bool Completed{
    get{return completed;}
    set{
        completed = value;
        if(completed == true){
            observers.ForEach(obs=>obs.OnCompleted());
        }
    }
}

public IDisposable Subscribe(IObserver<int> obs){
    observers.Add(obs);
    this.ForEach(n => obs.OnNext(n));
    if(Completed) obs.OnCompleted();
    return Disposable.Create(()=>observers.Remove(obs));
}

public void Add(int n){
    observers.ForEach(obs => obs.OnNext(n));
    base.Add(n);
}   
}

有更好的方法吗?像这样的东西已经在 Rx 中实现了吗?或者,如果我根本不应该这样做,我最好的选择是什么?

如果这是要走的路,我应该担心什么样的事情? (例如并发..等)。

你想要一个 ReplaySubject<int>。它会记住它收到的所有更新。当观察者订阅它时,观察者会收到主题的历史记录,然后是所有新的更新。

试试这个例子:

var subject = new ReplaySubject<int>();
subject.Subscribe(n => Console.WriteLine("A: {0}", n));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
Console.ReadLine();
subject.Subscribe(n => Console.WriteLine("B: {0}", n));
Console.ReadLine();
subject.OnNext(4);
subject.OnNext(5);

您应该得到如下互动:

A: 1
A: 2
A: 3
<Press Enter>
B: 1
B: 2
B: 3
<Press Enter>
A: 4
B: 4
A: 5
B: 5