在这种情况下最好实施 ObservableBase 还是有其他方法?
Is it the best to implement ObservableBase in this situation or is there another way?
首先,我没有找到自定义实现 ObservableBase 或 AnonymousObservable 的好例子。如果有的话,我不知道我需要实施哪一个。情况是这样的
我使用第三方库,有一个 class 让我们称之为 Producer,它允许我在其上设置一个委托,如 objProducer.Attach(MyHandler)。 MyHandler 将从 Producer 接收消息。我正在尝试围绕 Producer 创建一个包装器以使其可观察,并且理想情况下使其成为一种独特的类型,而不是仅创建一个可观察的实例(如 Observable.Create)。
已编辑:第三方制作人具有以下界面
public delegate void ProducerMessageHandler(Message objMessage);
public class Producer : IDisposable {
public void Start();
public void Attach(ProducerMessageHandler fnHandler);
public void Dispose();
}
如前所述,我无法控制它的源代码。它旨在像这样使用:创建一个实例,调用 Attach 并传递一个委托,调用 Start,当 Producer 接收或生成它们时,它基本上在提供的委托内启动接收消息。
我正在考虑创建 public class ProducerObservable : ObservableBase<Message>
以便当有人订阅它时我会(Rx 库会)将消息推送给观察者。似乎我需要在我的 ProducerObservable 的构造函数中的某处调用 Attach,然后我需要以某种方式在附加到它的观察者上调用 OnNext。这是否意味着我必须编写所有这些代码:将观察者列表 LinkedList<IObserver<Message>>
添加到 class,然后在 ProducerObservable 上调用 SubscribeCore 抽象方法时添加观察者?然后显然我将能够枚举 MyHandler 中的 LinkedList<IObserver<Message>>
并为每个调用 OnNext。所有这些看起来都可行,但感觉并不完全正确。我希望 .net 响应式扩展能够更好地为这种情况做好准备,并且至少在基础 class.
的某个地方准备好 LinkedList<IObserver<Message>>
的实现
在使用 Rx 的代码中,"Producer" 对象通常是通过 public 属性或方法公开 IObservable<T>
实例的对象。 Producer
class 本身实现 IObservable<T>
的情况不太常见,当它实现时,它通过使用 Rx
在引擎盖下完成繁重的工作来实现。您绝对不想自己实现 IObservable<T>
。
这是一个示例,其中可观察对象公开为 属性:
public class Producer
{
public Producer(ThirdPartyLib.Producer p)
{
var c = Observable.Create(observer =>
{
ProducerMessageHandler h = msg => observer.OnNext(msg);
p.Attach(h);
p.Start();
return Disposable.Empty;
}).Publish();
// Connect the observable the first time someone starts
// observing
Stream = Observable.Create(observer =>
{
var subscription = c.Subscribe(observer);
if (Interlocked.Exchange(ref _connected, 1) == 0)
{
c.Connect();
}
return subscription;
});
}
private int _connected;
public IObservable<Message> Stream { get; private set; }
}
这里是我们通过委托给 Rx 实际实现 IObservable<T>
的相同示例:
public class Producer : IObservable<Message>
{
public Producer(ThirdPartyLib.Producer p)
{
var c = Observable.Create(observer =>
{
ProducerMessageHandler h = msg => observer.OnNext(msg);
p.Attach(h);
p.Start();
return Disposable.Empty;
}).Publish();
// Connect the observable the first time someone starts
// observing
_stream = Observable.Create(observer =>
{
var subscription = c.Subscribe(observer);
if (Interlocked.Exchange(ref _connected, 1) == 0)
{
c.Connect();
}
return subscription;
});
}
private IObservable<Message> _stream;
// implement IObservable<T> by delegating to Rx
public IDisposable Subscribe(IObserver<Message> observer)
{
return _stream.Subscribe(observer);
}
}
以下是成为 Rx 应该做的事情 "friendly":
public static class ObservableProducer
{
public static IObservable<Message> Create()
{
return
Observable.Using(() => new Producer(), p =>
Observable.Create<Message>(o =>
{
ProducerMessageHandler handler = m => o.OnNext(m);
p.Attach(handler);
return Disposable.Create(() => o.OnCompleted());
}));
}
}
你可以这样使用:
IObservable<Message> query = ObservableProducer.Create();
您应该允许为所有新订阅创建多个 Producer
实例 - 这就是 Rx 的工作方式。
但是,如果您只想要一个 Producer
实例,那么请考虑在此可观察对象上使用 .Publish()
。
以下是如何确保单个 Producer
实例是 "self-managing":
IObservable<Message> query = ObservableProducer.Create().Publish().RefCount();
这将在第一个订阅上创建一个 Producer
实例并保持 Producer
直到不再有任何订阅。这使得它 "self-managing" 和滚动你自己的更好的解决方案 class.
如果您必须实现自己的 class,那么您经常会犯错误。您作为此问题的答案添加的 class 我可以看到三个。
- 您在附加消息处理程序后实例化主题。如果生产者在附加过程中创建消息,您的代码将失败。
- 您没有跟踪订阅。如果您不跟踪您的订阅,那么您将无法处理它们。 Rx 查询可以占用昂贵的资源,因此您应该尽早处理它们。
- 在处理生产者之前,您不会就此主题致电
.OnCompleted()
。
这是我对你的 class 的实现:
public class ProducerObservable : IObservable<Message>, IDisposable
{
private readonly Producer _Producer;
private readonly Subject<Message> _Subject;
private readonly CompositeDisposable _Disposables;
public ProducerObservable()
{
_Subject = new Subject<Message>();
ProducerMessageHandler fnHandler = m => _Subject.OnNext(m);
_Producer = new Producer();
_Producer.Attach(fnHandler);
_Producer.Start();
_Disposables = new CompositeDisposable();
_Disposables.Add(_Producer);
_Disposables.Add(_Subject);
}
public void Dispose()
{
_Subject.OnCompleted();
_Disposables.Dispose();
}
public IDisposable Subscribe(IObserver<Message> objObserver)
{
var subscription = _Subject.Subscribe(objObserver);
_Disposables.Add(subscription);
return subscription;
}
}
我还是不喜欢。在撰写本文时,我是 [system.reactive] 中获得银徽章的三个人之一(还没有人拥有金徽章),而且我从未实施过自己的 Observable。我才刚刚意识到我没有在这个问题上调用 .OnCompleted()
,所以我回去编辑上面的代码。这是一个雷区。依靠内置运算符要好得多。
ObservableBase
存在的原因是为了帮助防止人们犯错误,但并不能阻止错误。
这次讨论给了我一个想法。不就是这个吗?
public class ProducerObservable : IObservable<Message>, IDisposable {
private readonly Producer _Producer;
private readonly Subject<Message> _Subject;
public ProducerObservable() {
_Produder = new Producer();
_Producer.Attach(Message_Received);
_Subject = new Subject<Message>();
_Producer.Start();
}
public void Dispose() {
_Producer.Dispose();
_Subject.Dispose();
}
public IDisposable Subscribe(IObserver<Message> objObserver) {
return _Subject.Subscribe(objObserver);
}
private void Message_Received(Message objMessage) {
_Subject.OnNext(objMessage);
}
}
因此,在我看来,我们避免了额外的级别、额外的可观察对象,只有一个可观察类型,基本上我只看到优点而没有缺点。
首先,我没有找到自定义实现 ObservableBase 或 AnonymousObservable 的好例子。如果有的话,我不知道我需要实施哪一个。情况是这样的
我使用第三方库,有一个 class 让我们称之为 Producer,它允许我在其上设置一个委托,如 objProducer.Attach(MyHandler)。 MyHandler 将从 Producer 接收消息。我正在尝试围绕 Producer 创建一个包装器以使其可观察,并且理想情况下使其成为一种独特的类型,而不是仅创建一个可观察的实例(如 Observable.Create)。
已编辑:第三方制作人具有以下界面
public delegate void ProducerMessageHandler(Message objMessage);
public class Producer : IDisposable {
public void Start();
public void Attach(ProducerMessageHandler fnHandler);
public void Dispose();
}
如前所述,我无法控制它的源代码。它旨在像这样使用:创建一个实例,调用 Attach 并传递一个委托,调用 Start,当 Producer 接收或生成它们时,它基本上在提供的委托内启动接收消息。
我正在考虑创建 public class ProducerObservable : ObservableBase<Message>
以便当有人订阅它时我会(Rx 库会)将消息推送给观察者。似乎我需要在我的 ProducerObservable 的构造函数中的某处调用 Attach,然后我需要以某种方式在附加到它的观察者上调用 OnNext。这是否意味着我必须编写所有这些代码:将观察者列表 LinkedList<IObserver<Message>>
添加到 class,然后在 ProducerObservable 上调用 SubscribeCore 抽象方法时添加观察者?然后显然我将能够枚举 MyHandler 中的 LinkedList<IObserver<Message>>
并为每个调用 OnNext。所有这些看起来都可行,但感觉并不完全正确。我希望 .net 响应式扩展能够更好地为这种情况做好准备,并且至少在基础 class.
LinkedList<IObserver<Message>>
的实现
在使用 Rx 的代码中,"Producer" 对象通常是通过 public 属性或方法公开 IObservable<T>
实例的对象。 Producer
class 本身实现 IObservable<T>
的情况不太常见,当它实现时,它通过使用 Rx
在引擎盖下完成繁重的工作来实现。您绝对不想自己实现 IObservable<T>
。
这是一个示例,其中可观察对象公开为 属性:
public class Producer
{
public Producer(ThirdPartyLib.Producer p)
{
var c = Observable.Create(observer =>
{
ProducerMessageHandler h = msg => observer.OnNext(msg);
p.Attach(h);
p.Start();
return Disposable.Empty;
}).Publish();
// Connect the observable the first time someone starts
// observing
Stream = Observable.Create(observer =>
{
var subscription = c.Subscribe(observer);
if (Interlocked.Exchange(ref _connected, 1) == 0)
{
c.Connect();
}
return subscription;
});
}
private int _connected;
public IObservable<Message> Stream { get; private set; }
}
这里是我们通过委托给 Rx 实际实现 IObservable<T>
的相同示例:
public class Producer : IObservable<Message>
{
public Producer(ThirdPartyLib.Producer p)
{
var c = Observable.Create(observer =>
{
ProducerMessageHandler h = msg => observer.OnNext(msg);
p.Attach(h);
p.Start();
return Disposable.Empty;
}).Publish();
// Connect the observable the first time someone starts
// observing
_stream = Observable.Create(observer =>
{
var subscription = c.Subscribe(observer);
if (Interlocked.Exchange(ref _connected, 1) == 0)
{
c.Connect();
}
return subscription;
});
}
private IObservable<Message> _stream;
// implement IObservable<T> by delegating to Rx
public IDisposable Subscribe(IObserver<Message> observer)
{
return _stream.Subscribe(observer);
}
}
以下是成为 Rx 应该做的事情 "friendly":
public static class ObservableProducer
{
public static IObservable<Message> Create()
{
return
Observable.Using(() => new Producer(), p =>
Observable.Create<Message>(o =>
{
ProducerMessageHandler handler = m => o.OnNext(m);
p.Attach(handler);
return Disposable.Create(() => o.OnCompleted());
}));
}
}
你可以这样使用:
IObservable<Message> query = ObservableProducer.Create();
您应该允许为所有新订阅创建多个 Producer
实例 - 这就是 Rx 的工作方式。
但是,如果您只想要一个 Producer
实例,那么请考虑在此可观察对象上使用 .Publish()
。
以下是如何确保单个 Producer
实例是 "self-managing":
IObservable<Message> query = ObservableProducer.Create().Publish().RefCount();
这将在第一个订阅上创建一个 Producer
实例并保持 Producer
直到不再有任何订阅。这使得它 "self-managing" 和滚动你自己的更好的解决方案 class.
如果您必须实现自己的 class,那么您经常会犯错误。您作为此问题的答案添加的 class 我可以看到三个。
- 您在附加消息处理程序后实例化主题。如果生产者在附加过程中创建消息,您的代码将失败。
- 您没有跟踪订阅。如果您不跟踪您的订阅,那么您将无法处理它们。 Rx 查询可以占用昂贵的资源,因此您应该尽早处理它们。
- 在处理生产者之前,您不会就此主题致电
.OnCompleted()
。
这是我对你的 class 的实现:
public class ProducerObservable : IObservable<Message>, IDisposable
{
private readonly Producer _Producer;
private readonly Subject<Message> _Subject;
private readonly CompositeDisposable _Disposables;
public ProducerObservable()
{
_Subject = new Subject<Message>();
ProducerMessageHandler fnHandler = m => _Subject.OnNext(m);
_Producer = new Producer();
_Producer.Attach(fnHandler);
_Producer.Start();
_Disposables = new CompositeDisposable();
_Disposables.Add(_Producer);
_Disposables.Add(_Subject);
}
public void Dispose()
{
_Subject.OnCompleted();
_Disposables.Dispose();
}
public IDisposable Subscribe(IObserver<Message> objObserver)
{
var subscription = _Subject.Subscribe(objObserver);
_Disposables.Add(subscription);
return subscription;
}
}
我还是不喜欢。在撰写本文时,我是 [system.reactive] 中获得银徽章的三个人之一(还没有人拥有金徽章),而且我从未实施过自己的 Observable。我才刚刚意识到我没有在这个问题上调用 .OnCompleted()
,所以我回去编辑上面的代码。这是一个雷区。依靠内置运算符要好得多。
ObservableBase
存在的原因是为了帮助防止人们犯错误,但并不能阻止错误。
这次讨论给了我一个想法。不就是这个吗?
public class ProducerObservable : IObservable<Message>, IDisposable {
private readonly Producer _Producer;
private readonly Subject<Message> _Subject;
public ProducerObservable() {
_Produder = new Producer();
_Producer.Attach(Message_Received);
_Subject = new Subject<Message>();
_Producer.Start();
}
public void Dispose() {
_Producer.Dispose();
_Subject.Dispose();
}
public IDisposable Subscribe(IObserver<Message> objObserver) {
return _Subject.Subscribe(objObserver);
}
private void Message_Received(Message objMessage) {
_Subject.OnNext(objMessage);
}
}
因此,在我看来,我们避免了额外的级别、额外的可观察对象,只有一个可观察类型,基本上我只看到优点而没有缺点。