在这种情况下最好实施 ObservableBase 还是有其他方法?

Is it the best to implement ObservableBase in this situation or is there another way?

首先,我没有找到自定义实现 ObservableBase 或 AnonymousObservable 的好例子。如果有的话,我不知道我需要实施哪一个。情况是这样的

我使用第三方库,有一个 class 让我们称之为 Producer,它允许我在其上设置一个委托,如 objProducer.Attach(MyHandler)。 MyHandler 将从 Producer 接收消息。我正在尝试围绕 Producer 创建一个包装器以使其可观察,并且理想情况下使其成为一种独特的类型,而不是仅创建一个可观察的实例(如 Observable.Create)。

已编辑:第三方制作人具有以下界面

public delegate void ProducerMessageHandler(Message objMessage);
public class Producer : IDisposable {
   public void Start();
   public void Attach(ProducerMessageHandler fnHandler);
   public void Dispose();
}

如前所述,我无法控制它的源代码。它旨在像这样使用:创建一个实例,调用 Attach 并传递一个委托,调用 Start,当 Producer 接收或生成它们时,它基本上在提供的委托内启动接收消息。

我正在考虑创建 public class ProducerObservable : ObservableBase<Message> 以便当有人订阅它时我会(Rx 库会)将消息推送给观察者。似乎我需要在我的 ProducerObservable 的构造函数中的某处调用 Attach,然后我需要以某种方式在附加到它的观察者上调用 OnNext。这是否意味着我必须编写所有这些代码:将观察者列表 LinkedList<IObserver<Message>> 添加到 class,然后在 ProducerObservable 上调用 SubscribeCore 抽象方法时添加观察者?然后显然我将能够枚举 MyHandler 中的 LinkedList<IObserver<Message>> 并为每个调用 OnNext。所有这些看起来都可行,但感觉并不完全正确。我希望 .net 响应式扩展能够更好地为这种情况做好准备,并且至少在基础 class.

的某个地方准备好 LinkedList<IObserver<Message>> 的实现

在使用 Rx 的代码中,"Producer" 对象通常是通过 public 属性或方法公开 IObservable<T> 实例的对象。 Producer class 本身实现 IObservable<T> 的情况不太常见,当它实现时,它通过使用 Rx 在引擎盖下完成繁重的工作来实现。您绝对不想自己实现 IObservable<T>

这是一个示例,其中可观察对象公开为 属性:

public class Producer
{
    public Producer(ThirdPartyLib.Producer p)
    {
        var c = Observable.Create(observer =>
        {
            ProducerMessageHandler h = msg => observer.OnNext(msg);
            p.Attach(h);
            p.Start();

            return Disposable.Empty;
        }).Publish();

        // Connect the observable the first time someone starts
        // observing
        Stream = Observable.Create(observer =>
        {
            var subscription = c.Subscribe(observer);
            if (Interlocked.Exchange(ref _connected, 1) == 0)
            {
                c.Connect();
            }

            return subscription;
        });
    }

    private int _connected;
    public IObservable<Message> Stream { get; private set; }
}

这里是我们通过委托给 Rx 实际实现 IObservable<T> 的相同示例:

public class Producer : IObservable<Message>
{
    public Producer(ThirdPartyLib.Producer p)
    {
        var c = Observable.Create(observer =>
        {
            ProducerMessageHandler h = msg => observer.OnNext(msg);
            p.Attach(h);
            p.Start();

            return Disposable.Empty;
        }).Publish();

        // Connect the observable the first time someone starts
        // observing
        _stream = Observable.Create(observer =>
        {
            var subscription = c.Subscribe(observer);
            if (Interlocked.Exchange(ref _connected, 1) == 0)
            {
                c.Connect();
            }

            return subscription;
        });
    }

    private IObservable<Message> _stream;

    // implement IObservable<T> by delegating to Rx
    public IDisposable Subscribe(IObserver<Message> observer)
    {
        return _stream.Subscribe(observer);
    }
}

以下是成为 Rx 应该做的事情 "friendly":

public static class ObservableProducer
{
    public static IObservable<Message> Create()
    {
        return 
            Observable.Using(() => new Producer(), p =>
                Observable.Create<Message>(o => 
                {
                    ProducerMessageHandler handler = m => o.OnNext(m);
                    p.Attach(handler);
                    return Disposable.Create(() => o.OnCompleted());
                }));
    }
}

你可以这样使用:

IObservable<Message> query = ObservableProducer.Create();

您应该允许为所有新订阅创建多个 Producer 实例 - 这就是 Rx 的工作方式。

但是,如果您只想要一个 Producer 实例,那么请考虑在此可观察对象上使用 .Publish()

以下是如何确保单个 Producer 实例是 "self-managing":

IObservable<Message> query = ObservableProducer.Create().Publish().RefCount();

这将在第一个订阅上创建一个 Producer 实例并保持 Producer 直到不再有任何订阅。这使得它 "self-managing" 和滚动你自己的更好的解决方案 class.

如果您必须实现自己的 class,那么您经常会犯错误。您作为此问题的答案添加的 class 我可以看到三个。

  1. 您在附加消息处理程序后实例化主题。如果生产者在附加过程中创建消息,您的代码将失败。
  2. 您没有跟踪订阅。如果您不跟踪您的订阅,那么您将无法处理它们。 Rx 查询可以占用昂贵的资源,因此您应该尽早处理它们。
  3. 在处理生产者之前,您不会就此主题致电 .OnCompleted()

这是我对你的 class 的实现:

public class ProducerObservable : IObservable<Message>, IDisposable
{
    private readonly Producer _Producer;
    private readonly Subject<Message> _Subject;
    private readonly CompositeDisposable _Disposables;

    public ProducerObservable()
    {
        _Subject = new Subject<Message>();
        ProducerMessageHandler fnHandler = m => _Subject.OnNext(m);

        _Producer = new Producer();
        _Producer.Attach(fnHandler);
        _Producer.Start();

        _Disposables = new CompositeDisposable();
        _Disposables.Add(_Producer);
        _Disposables.Add(_Subject);
    }

    public void Dispose()
    {
        _Subject.OnCompleted();
        _Disposables.Dispose();
    }

    public IDisposable Subscribe(IObserver<Message> objObserver)
    {
        var subscription = _Subject.Subscribe(objObserver);
        _Disposables.Add(subscription);
        return subscription;
    }
}

我还是不喜欢。在撰写本文时,我是 [system.reactive] 中获得银徽章的三个人之一(还没有人拥有金徽章),而且我从未实施过自己的 Observable。我才刚刚意识到我没有在这个问题上调用 .OnCompleted(),所以我回去编辑上面的代码。这是一个雷区。依靠内置运算符要好得多。

ObservableBase 存在的原因是为了帮助防止人们犯错误,但并不能阻止错误。

这次讨论给了我一个想法。不就是这个吗?

public class ProducerObservable : IObservable<Message>, IDisposable {
   private readonly Producer _Producer;
   private readonly Subject<Message> _Subject;

   public ProducerObservable() {
      _Produder = new Producer();
      _Producer.Attach(Message_Received);
      _Subject = new Subject<Message>();
      _Producer.Start();
   }

   public void Dispose() {
      _Producer.Dispose();
      _Subject.Dispose();
   }

   public IDisposable Subscribe(IObserver<Message> objObserver) {
      return _Subject.Subscribe(objObserver);
   }

   private void Message_Received(Message objMessage) {
      _Subject.OnNext(objMessage);
   }
}

因此,在我看来,我们避免了额外的级别、额外的可观察对象,只有一个可观察类型,基本上我只看到优点而没有缺点。