将 EasynetQ 订阅者转变为可观察对象

Turning a EasynetQ subscriber into an observable

我在使用 RabbitMQ 时使用 EasyNetQ 作为客户端库。要创建订阅者,请按

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

发布 MyMessage 的实例时,EasyNetQ 将调用委托并将消息的文本 属性 打印到控制台。

如何将其转换为可观察序列?我一直在查看 Observable.CreateObservable.Generate 方法,但我不知道如何桥接 RabbitMQ 消费者和可观察序列。

Subscribe 方法 returns 和 IDisposable 因此解决方案应该尊重这一点,以便可以正确处理资源。

我注意到了这个 solution 但大多数人似乎都建议不要使用 Subject 所以我想找到另一个解决方案。

欢迎任何提示或想法。

这个有用吗?

var observable = Observable.Create<MyMessage>(o => 
    bus.Subscribe<MyMessage>("my_subscription_id", msg => o.OnNext(msg))
);

我对引用代码中使用 Subject 的方式没有问题:如果 Subject 是私有(理想情况下 read-only)字段,仅作为IObservable 那么它的影响就被遏制了。但是,引用 link:

处的代码还有其他问题
  • 如果有多个.Connect()调用,那么就会有多个订阅,只有一个会被释放。这可能会导致泄漏(不知道 EasyNetQ 订阅是如何实现的)。

  • 最好不要实现IObservable(或者IConnectableObservable),因为很容易出现上面那样的错误