如何判断 Subject.OfType<T> 是否有订阅者
How to tell if a Subject.OfType<T> has any subscribers
所以我正在尝试使用 Rx.NET 组合一个小型消息总线。
public class Bus {
private readonly Subject<BaseCommand> _commands = new Subject<BaseComand>();
public void RegisterHandler<TCommand>(Action<TCommand> handler) where TCommand: BaseCommand {
_commands
.OfType<TCommand>()
.Publish()
.RefCount()
.Subscribe(handler);
}
public void SendCommand<TCommand>(TCommand command) where TCommand: BaseCommand {
_commands.OnNext(command);
}
}
这就是代码的要点。我想限制订阅,以便一种消息类型只能存在一个订阅。在添加新订阅之前,是否有检查来自 OfType<T>
的 Observable 是否有任何现有订阅?
我建议这样(我已经将您的 RegisterHandler 更改为具有 IDisposable return 类型,因此您实际上可以再次取消订阅):
public class Bus
{
private readonly Subject<BaseCommand> _commands = new Subject<BaseCommand>();
private class Counter<TCommand> where TCommand : BaseCommand
{
public static int Count;
}
public IDisposable RegisterHandler<TCommand>(Action<TCommand> handler, Action<Exception> OnError = null) where TCommand : BaseCommand
{
OnError = OnError ?? (Action<Exception>)((ex) => Dispatcher.CurrentDispatcher.Invoke(() => {throw ex; })); // alternative case of course only works if dispatcher is available
return Observable.Create<TCommand>(o =>
{
if (Interlocked.Increment(ref Counter<TCommand>.Count) > 1)
{
Interlocked.Decrement(ref Counter<TCommand>.Count);
o.OnError(new InvalidOperationException("Too many subscribers!"));
return Disposable.Empty;
}
var subscription = _commands
.OfType<TCommand>()
.Publish()
.RefCount()
.Subscribe(o);
var decrement = Disposable.Create(() =>
{
Interlocked.Decrement(ref Counter<TCommand>.Count);
});
return new CompositeDisposable(subscription, decrement);
})
.Subscribe(handler, OnError);
}
public void SendCommand<TCommand>(TCommand command) where TCommand : BaseCommand
{
_commands.OnNext(command);
}
}
编辑:我可能会将您的 RegisterHandler 函数的签名更改为
public IObservable<TCommand> RegisterHandler<TCommand>() where TCommand : BaseCommand
不过;节省了一些错误管理的麻烦(订阅者必须自己处理),并且您的消费者在订阅这些事件的时间和方式上更加自由。
所以我正在尝试使用 Rx.NET 组合一个小型消息总线。
public class Bus {
private readonly Subject<BaseCommand> _commands = new Subject<BaseComand>();
public void RegisterHandler<TCommand>(Action<TCommand> handler) where TCommand: BaseCommand {
_commands
.OfType<TCommand>()
.Publish()
.RefCount()
.Subscribe(handler);
}
public void SendCommand<TCommand>(TCommand command) where TCommand: BaseCommand {
_commands.OnNext(command);
}
}
这就是代码的要点。我想限制订阅,以便一种消息类型只能存在一个订阅。在添加新订阅之前,是否有检查来自 OfType<T>
的 Observable 是否有任何现有订阅?
我建议这样(我已经将您的 RegisterHandler 更改为具有 IDisposable return 类型,因此您实际上可以再次取消订阅):
public class Bus
{
private readonly Subject<BaseCommand> _commands = new Subject<BaseCommand>();
private class Counter<TCommand> where TCommand : BaseCommand
{
public static int Count;
}
public IDisposable RegisterHandler<TCommand>(Action<TCommand> handler, Action<Exception> OnError = null) where TCommand : BaseCommand
{
OnError = OnError ?? (Action<Exception>)((ex) => Dispatcher.CurrentDispatcher.Invoke(() => {throw ex; })); // alternative case of course only works if dispatcher is available
return Observable.Create<TCommand>(o =>
{
if (Interlocked.Increment(ref Counter<TCommand>.Count) > 1)
{
Interlocked.Decrement(ref Counter<TCommand>.Count);
o.OnError(new InvalidOperationException("Too many subscribers!"));
return Disposable.Empty;
}
var subscription = _commands
.OfType<TCommand>()
.Publish()
.RefCount()
.Subscribe(o);
var decrement = Disposable.Create(() =>
{
Interlocked.Decrement(ref Counter<TCommand>.Count);
});
return new CompositeDisposable(subscription, decrement);
})
.Subscribe(handler, OnError);
}
public void SendCommand<TCommand>(TCommand command) where TCommand : BaseCommand
{
_commands.OnNext(command);
}
}
编辑:我可能会将您的 RegisterHandler 函数的签名更改为
public IObservable<TCommand> RegisterHandler<TCommand>() where TCommand : BaseCommand
不过;节省了一些错误管理的麻烦(订阅者必须自己处理),并且您的消费者在订阅这些事件的时间和方式上更加自由。