完成后的冷可观察集合
Cold observable collection with completion
所以也许我把这个观察者模式弄错了,但这就是我想要的。我有一个方法应该从数据库中获取一些整数。看起来像这样:
IObservable<int> GetInts()
每次他完成获取一个整数时,他自然应该通过 OnNext() 告诉它的订阅者。但是,如果旁观者迟到,他可能会错过其中的一些电话。我仍然想让他知道所有的信息,所以当他订阅 Observable 时应该 "fill him in" 并告诉他他错过的信息......然后在添加新东西时继续告诉他。
但是,在某个时候数据库读取将结束,方法会知道。那时他应该调用订阅者的 OnComplete()。所以基本上我想要的是一个可以告诉人们一些东西的系列。我一直没能找到做这样的事情的东西,所以我必须实施它。它可能看起来像:
class MyObservable:List<int>,IObservable<int>{
private bool completed;
private List<IObserver<int>> observers = new List<IObserver<int>>();
public bool Completed{
get{return completed;}
set{
completed = value;
if(completed == true){
observers.ForEach(obs=>obs.OnCompleted());
}
}
}
public IDisposable Subscribe(IObserver<int> obs){
observers.Add(obs);
this.ForEach(n => obs.OnNext(n));
if(Completed) obs.OnCompleted();
return Disposable.Create(()=>observers.Remove(obs));
}
public void Add(int n){
observers.ForEach(obs => obs.OnNext(n));
base.Add(n);
}
}
有更好的方法吗?像这样的东西已经在 Rx 中实现了吗?或者,如果我根本不应该这样做,我最好的选择是什么?
如果这是要走的路,我应该担心什么样的事情? (例如并发..等)。
你想要一个 ReplaySubject<int>
。它会记住它收到的所有更新。当观察者订阅它时,观察者会收到主题的历史记录,然后是所有新的更新。
试试这个例子:
var subject = new ReplaySubject<int>();
subject.Subscribe(n => Console.WriteLine("A: {0}", n));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
Console.ReadLine();
subject.Subscribe(n => Console.WriteLine("B: {0}", n));
Console.ReadLine();
subject.OnNext(4);
subject.OnNext(5);
您应该得到如下互动:
A: 1
A: 2
A: 3
<Press Enter>
B: 1
B: 2
B: 3
<Press Enter>
A: 4
B: 4
A: 5
B: 5
所以也许我把这个观察者模式弄错了,但这就是我想要的。我有一个方法应该从数据库中获取一些整数。看起来像这样:
IObservable<int> GetInts()
每次他完成获取一个整数时,他自然应该通过 OnNext() 告诉它的订阅者。但是,如果旁观者迟到,他可能会错过其中的一些电话。我仍然想让他知道所有的信息,所以当他订阅 Observable 时应该 "fill him in" 并告诉他他错过的信息......然后在添加新东西时继续告诉他。
但是,在某个时候数据库读取将结束,方法会知道。那时他应该调用订阅者的 OnComplete()。所以基本上我想要的是一个可以告诉人们一些东西的系列。我一直没能找到做这样的事情的东西,所以我必须实施它。它可能看起来像:
class MyObservable:List<int>,IObservable<int>{
private bool completed;
private List<IObserver<int>> observers = new List<IObserver<int>>();
public bool Completed{
get{return completed;}
set{
completed = value;
if(completed == true){
observers.ForEach(obs=>obs.OnCompleted());
}
}
}
public IDisposable Subscribe(IObserver<int> obs){
observers.Add(obs);
this.ForEach(n => obs.OnNext(n));
if(Completed) obs.OnCompleted();
return Disposable.Create(()=>observers.Remove(obs));
}
public void Add(int n){
observers.ForEach(obs => obs.OnNext(n));
base.Add(n);
}
}
有更好的方法吗?像这样的东西已经在 Rx 中实现了吗?或者,如果我根本不应该这样做,我最好的选择是什么?
如果这是要走的路,我应该担心什么样的事情? (例如并发..等)。
你想要一个 ReplaySubject<int>
。它会记住它收到的所有更新。当观察者订阅它时,观察者会收到主题的历史记录,然后是所有新的更新。
试试这个例子:
var subject = new ReplaySubject<int>();
subject.Subscribe(n => Console.WriteLine("A: {0}", n));
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
Console.ReadLine();
subject.Subscribe(n => Console.WriteLine("B: {0}", n));
Console.ReadLine();
subject.OnNext(4);
subject.OnNext(5);
您应该得到如下互动:
A: 1
A: 2
A: 3
<Press Enter>
B: 1
B: 2
B: 3
<Press Enter>
A: 4
B: 4
A: 5
B: 5