在 RX 中合并多个自定义可观察对象

Merging multiple custom observables in RX

尝试对使用 RX 从多个发布者发送通知的系统建模。

我有两个自定义接口 ITopicObservable 和 ITopicObserver 来模拟实现 类 除了 IObservable 和 IObserver 接口之外还有其他属性和方法的事实。

我的问题是我的想法是我应该能够将多个可观察对象添加在一起,将它们合并在一起并订阅一个观察者以提供来自所有合并的可观察对象的更新。然而,带有 "the issue" 注释的代码会抛出一个无效的转换异常。

用例是一些独立的传感器,每个传感器都监测一个盒子中的温度,例如将它们的所有报告汇总到一个温度报告,然后由温度健康监视器订阅。

我在这里错过了什么?或者有没有更好的方式使用RX实现场景?

下面的代码

using System;
using System.Reactive.Linq;
using System.Collections.Generic;

namespace test
{
class MainClass
{
    public static void Main (string[] args)
    {
        Console.WriteLine ("Hello World!");
        var to = new TopicObserver ();
        var s = new TopicObservable ("test");

        var agg = new AggregatedTopicObservable ();
        agg.Add (s);

        agg.Subscribe (to);
    }
}

public interface ITopicObservable<TType>:IObservable<TType>
{
    string Name{get;}
}

public class TopicObservable:ITopicObservable<int>
{
    public TopicObservable(string name)
    {
        Name = name;
    }
    #region IObservable implementation
    public IDisposable Subscribe (IObserver<int> observer)
    {
        return null;
    }
    #endregion
    #region ITopicObservable implementation
    public string Name { get;private set;}

    #endregion
}

public class AggregatedTopicObservable:ITopicObservable<int>
{
    List<TopicObservable> _topics;
    private ITopicObservable<int> _observable;
    private IDisposable _disposable;

    public AggregatedTopicObservable()
    {
        _topics = new List<TopicObservable>();
    }

    public void Add(ITopicObservable<int> observable)
    {
        _topics.Add ((TopicObservable)observable);
    }

    #region IObservable implementation
    public IDisposable Subscribe (IObserver<int> observer)
    {
        _observable = (ITopicObservable<int>)_topics.Merge ();

        _disposable = _observable.Subscribe(observer);

        return _disposable;
    }
    #endregion
    #region ITopicObservable implementation
    public string Name { get;private set;}
    #endregion

}



public interface ITopicObserver<TType>:IObserver<TType>
{
    string Name{get;}
}

public class TopicObserver:ITopicObserver<int>
{
    #region IObserver implementation
    public void OnNext (int value)
    {
        Console.WriteLine ("next {0}", value);
    }
    public void OnError (Exception error)
    {
        Console.WriteLine ("error {0}", error.Message);
    }
    public void OnCompleted ()
    {
        Console.WriteLine ("finished");
    }
    #endregion
    #region ITopicObserver implementation
    public string Name { get;private set;}
    #endregion

}
}

我的第一个想法是,您不应该实现 IObservable<T>,您应该通过将其公开为 属性 或方法的结果来组合它。

第二个想法是 Rx 中有运算符 excel 在 merging/aggregating 多个序列在一起。 你应该喜欢使用那些。

第三种,和第一种类似,一般不实现IObserver<T>,你只订阅observable序列,为每次回调提供委托(OnNextOnErrorOnComplete)

所以你的代码基本上被简化为

Console.WriteLine("Hello World!");
var topic1 = TopicListener("test1");
var topic2 = TopicListener("test2");

topic1.Merge(topic2)
    .Subscribe(
    val => { Console.WriteLine("One of the topics published this value {0}", val);},
    ex => { Console.WriteLine("One of the topics errored. Now the whole sequence is dead {0}", ex);},
    () => {Console.WriteLine("All topics have completed.");});

其中TopicListener(string)只是returnsIObservable<T>的一个方法。 TopicListener(string) 方法的实现最有可能使用 Observable.Create.

查看在基于主题的消息传递系统上映射 Rx 的示例可能会有所帮助。 这里有一个示例,说明如何在 TibRv 主题上分层 Rx https://github.com/LeeCampbell/RxCookbook/blob/master/IO/Comms/TibRvSample.linq

您正在使用的 .Merge(...) 运算符的签名是:

IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources)

这个.Merge()返回的实际类型是:

System.Reactive.Linq.ObservableImpl.Merge`1[System.Int32]

...所以很明显调用 (ITopicObservable<int>)_topics.Merge(); 会失败。

Lee 建议不要执行 IObservable<>IObserver<> 中的任何一个是正确的。它会导致像上面那样的错误。

如果你必须这样做,我会这样做:

public interface ITopic
{
    string Name { get; }
}

public interface ITopicObservable<TType> : ITopic, IObservable<TType>
{ }

public interface ITopicSubject<TType> : ISubject<TType>, ITopicObservable<TType>
{ }

public interface ITopicObserver<TType> : ITopic, IObserver<TType>
{ }

public class Topic
{
    public string Name { get; private set; }

    public Topic(string name)
    {
        this.Name = name;
    }
}

public class TopicSubject : Topic, ITopicSubject<int>
{
    private Subject<int> _subject = new Subject<int>();

    public TopicSubject(string name)
        : base(name)
    { }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        return _subject.Subscribe(observer);
    }

    public void OnNext(int value)
    {
        _subject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _subject.OnError(error);
    }

    public void OnCompleted()
    {
        _subject.OnCompleted();
    }
}

public class AggregatedTopicObservable : Topic, ITopicObservable<int>
{
    List<ITopicObservable<int>> _topics = new List<ITopicObservable<int>>();

    public AggregatedTopicObservable(string name)
        : base(name)
    { }

    public void Add(ITopicObservable<int> observable)
    {
        _topics.Add(observable);
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        return _topics.Merge().Subscribe(observer);
    }
}

public class TopicObserver : Topic, ITopicObserver<int>
{
    private IObserver<int> _observer;

    public TopicObserver(string name)
        : base(name)
    {
        _observer =
            Observer
                .Create<int>(
                    value => Console.WriteLine("next {0}", value),
                    error => Console.WriteLine("error {0}", error.Message),
                    () => Console.WriteLine("finished"));
    }

    public void OnNext(int value)
    {
        _observer.OnNext(value);
    }
    public void OnError(Exception error)
    {
        _observer.OnError(error);
    }
    public void OnCompleted()
    {
        _observer.OnCompleted();
    }
}

并且 运行 它与:

var to = new TopicObserver("watching");
var ts1 = new TopicSubject("topic 1");
var ts2 = new TopicSubject("topic 2");

var agg = new AggregatedTopicObservable("agg");

agg.Add(ts1);
agg.Add(ts2);

agg.Subscribe(to);

ts1.OnNext(42);
ts1.OnCompleted();

ts2.OnNext(1);
ts2.OnCompleted();

给出:

next 42
next 1
finished

但是除了能够给所有东西起一个名字(我不确定它有什么帮助)之外,你总是可以这样做:

var to =
    Observer
        .Create<int>(
            value => Console.WriteLine("next {0}", value),
            error => Console.WriteLine("error {0}", error.Message),
            () => Console.WriteLine("finished"));

var ts1 = new Subject<int>();
var ts2 = new Subject<int>();

var agg = new [] { ts1, ts2 }.Merge();

agg.Subscribe(to);

ts1.OnNext(42);
ts1.OnCompleted();

ts2.OnNext(1);
ts2.OnCompleted();

没有接口的相同输出 类。

还有一个更有趣的方法。试试这个:

var to =
    Observer
        .Create<int>(
            value => Console.WriteLine("next {0}", value),
            error => Console.WriteLine("error {0}", error.Message),
            () => Console.WriteLine("finished"));

var agg = new Subject<IObservable<int>>();

agg.Merge().Subscribe(to);

var ts1 = new Subject<int>();
var ts2 = new Subject<int>();

agg.OnNext(ts1);
agg.OnNext(ts2);

ts1.OnNext(42);
ts1.OnCompleted();

ts2.OnNext(1);
ts2.OnCompleted();

var ts3 = new Subject<int>();

agg.OnNext(ts3);

ts3.OnNext(99);
ts3.OnCompleted();

这会产生:

next 42
next 1
next 99

它允许您在合并后添加新的源可观察对象!