并行观察多个IObservable

observe multiple IObservable parallel

我有一个 Session 和一个 ServiceService 可以有多个 Sessions。每个 Session 得到一个 IObservable<SessionState> 属性.

我想达到的目标:

如果所有Session.States都是NotConnected,那么Service应该调用Foo()。我已经有了解决方案,但我认为这不是 reactive way 解决方法。

public class Session
{
    public SessionState ActualState => _SessionStateSubject.Value;
    public IObservable<SessionState> State => _SessionStateSubject.AsObservable();
    private readonly BehaviorSubject<SessionState> _SessionStateSubject = new BehaviorSubject<SessionState>(SessionState.NotConnected);

    public void Initialize()
    {
        _SessionStateSubject.OnNext(SessionState.Initializing);
    }
}

public class Service
{
    private List<Session> _Sessions = new List<Session>();

    public void SubscribeToChannels()
    {
        foreach (Session session in _Sessions)
        {
            session.State.TakeUntil(s => s == SessionState.NotConnected).Subscribe(state =>
            {
                // check all other sessions
                if(_Sessions.All(s => s.ActualState == SessionState.NotConnected))
                    Foo();
            });
        }
    }

    public void Foo()
    {
        // this should be called if all of them are 'NotConnected'
    }
}

您可以使用 CombineLatest 获取所有项目最后已知状态的列表,只要有更改。从那里开始,它只是检查是否所有状态都是 NotConnected.

    public IDisposable SubscribeToChannels()
    {
        return Observable
               .CombineLatest(_Sessions.Select(s => s.State))
               .Subscribe(states =>
               {
                   if (states.All(s => s == SessionState.NotConnected))
                        Foo();
               });
    }