将 EasynetQ 订阅者转变为可观察对象
Turning a EasynetQ subscriber into an observable
我在使用 RabbitMQ 时使用 EasyNetQ 作为客户端库。要创建订阅者,请按
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
发布 MyMessage 的实例时,EasyNetQ 将调用委托并将消息的文本 属性 打印到控制台。
如何将其转换为可观察序列?我一直在查看 Observable.Create
和 Observable.Generate
方法,但我不知道如何桥接 RabbitMQ 消费者和可观察序列。
Subscribe
方法 returns 和 IDisposable
因此解决方案应该尊重这一点,以便可以正确处理资源。
我注意到了这个 solution 但大多数人似乎都建议不要使用 Subject
所以我想找到另一个解决方案。
欢迎任何提示或想法。
这个有用吗?
var observable = Observable.Create<MyMessage>(o =>
bus.Subscribe<MyMessage>("my_subscription_id", msg => o.OnNext(msg))
);
我对引用代码中使用 Subject
的方式没有问题:如果 Subject
是私有(理想情况下 read-only)字段,仅作为IObservable
那么它的影响就被遏制了。但是,引用 link:
处的代码还有其他问题
如果有多个.Connect()
调用,那么就会有多个订阅,只有一个会被释放。这可能会导致泄漏(不知道 EasyNetQ 订阅是如何实现的)。
最好不要实现IObservable
(或者IConnectableObservable
),因为很容易出现上面那样的错误
我在使用 RabbitMQ 时使用 EasyNetQ 作为客户端库。要创建订阅者,请按
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
发布 MyMessage 的实例时,EasyNetQ 将调用委托并将消息的文本 属性 打印到控制台。
如何将其转换为可观察序列?我一直在查看 Observable.Create
和 Observable.Generate
方法,但我不知道如何桥接 RabbitMQ 消费者和可观察序列。
Subscribe
方法 returns 和 IDisposable
因此解决方案应该尊重这一点,以便可以正确处理资源。
我注意到了这个 solution 但大多数人似乎都建议不要使用 Subject
所以我想找到另一个解决方案。
欢迎任何提示或想法。
这个有用吗?
var observable = Observable.Create<MyMessage>(o =>
bus.Subscribe<MyMessage>("my_subscription_id", msg => o.OnNext(msg))
);
我对引用代码中使用 Subject
的方式没有问题:如果 Subject
是私有(理想情况下 read-only)字段,仅作为IObservable
那么它的影响就被遏制了。但是,引用 link:
如果有多个
.Connect()
调用,那么就会有多个订阅,只有一个会被释放。这可能会导致泄漏(不知道 EasyNetQ 订阅是如何实现的)。最好不要实现
IObservable
(或者IConnectableObservable
),因为很容易出现上面那样的错误