并行观察多个IObservable
observe multiple IObservable parallel
我有一个 Session
和一个 Service
。 Service
可以有多个 Sessions
。每个 Session
得到一个 IObservable<SessionState>
属性.
我想达到的目标:
如果所有Session.States
都是NotConnected
,那么Service
应该调用Foo()
。我已经有了解决方案,但我认为这不是 reactive way
解决方法。
public class Session
{
public SessionState ActualState => _SessionStateSubject.Value;
public IObservable<SessionState> State => _SessionStateSubject.AsObservable();
private readonly BehaviorSubject<SessionState> _SessionStateSubject = new BehaviorSubject<SessionState>(SessionState.NotConnected);
public void Initialize()
{
_SessionStateSubject.OnNext(SessionState.Initializing);
}
}
public class Service
{
private List<Session> _Sessions = new List<Session>();
public void SubscribeToChannels()
{
foreach (Session session in _Sessions)
{
session.State.TakeUntil(s => s == SessionState.NotConnected).Subscribe(state =>
{
// check all other sessions
if(_Sessions.All(s => s.ActualState == SessionState.NotConnected))
Foo();
});
}
}
public void Foo()
{
// this should be called if all of them are 'NotConnected'
}
}
您可以使用 CombineLatest
获取所有项目最后已知状态的列表,只要有更改。从那里开始,它只是检查是否所有状态都是 NotConnected
.
public IDisposable SubscribeToChannels()
{
return Observable
.CombineLatest(_Sessions.Select(s => s.State))
.Subscribe(states =>
{
if (states.All(s => s == SessionState.NotConnected))
Foo();
});
}
我有一个 Session
和一个 Service
。 Service
可以有多个 Sessions
。每个 Session
得到一个 IObservable<SessionState>
属性.
我想达到的目标:
如果所有Session.States
都是NotConnected
,那么Service
应该调用Foo()
。我已经有了解决方案,但我认为这不是 reactive way
解决方法。
public class Session
{
public SessionState ActualState => _SessionStateSubject.Value;
public IObservable<SessionState> State => _SessionStateSubject.AsObservable();
private readonly BehaviorSubject<SessionState> _SessionStateSubject = new BehaviorSubject<SessionState>(SessionState.NotConnected);
public void Initialize()
{
_SessionStateSubject.OnNext(SessionState.Initializing);
}
}
public class Service
{
private List<Session> _Sessions = new List<Session>();
public void SubscribeToChannels()
{
foreach (Session session in _Sessions)
{
session.State.TakeUntil(s => s == SessionState.NotConnected).Subscribe(state =>
{
// check all other sessions
if(_Sessions.All(s => s.ActualState == SessionState.NotConnected))
Foo();
});
}
}
public void Foo()
{
// this should be called if all of them are 'NotConnected'
}
}
您可以使用 CombineLatest
获取所有项目最后已知状态的列表,只要有更改。从那里开始,它只是检查是否所有状态都是 NotConnected
.
public IDisposable SubscribeToChannels()
{
return Observable
.CombineLatest(_Sessions.Select(s => s.State))
.Subscribe(states =>
{
if (states.All(s => s == SessionState.NotConnected))
Foo();
});
}