Subscribe/Listen 到 "side events" 的 System.Reactive 流
Subscribe/Listen to "side events" of the stream with System.Reactive
我从响应式扩展开始,但我遇到了一个问题,我不确定我是否在正确的轨道上。
我正在使用 Observable 通过 .NET 为事件代理创建和使用侦听器。我创建了一个 "IncomingMessage" class,其中包含来自 eventbroker 的消息,我开始在 Observerable.Create 函数中创建侦听器。效果很好。
现在我还想从侦听器获取状态通知,如 "Connecting..."、"Connected."、"Closing..." 中那样,它们不是 IncomingMessage 所以我创建了一个 class "BrokerEvent" 带有 "Message" 属性 和 "IncomingMessage" 和 "BrokerEvent" 的接口。现在我通过 observer.OnNext(...) 发送它们。到目前为止,这也很有效。
但是在订阅者方面,我现在在过滤我需要的事件时遇到了一些问题。
我愿意:
GetObservable().Where(x => x is BrokerEvent ||
(x is IncomingMessage msg &&
msg.User == "test")).Subscribe(...)
这行得通,但是我需要再次弄清楚 Subscribe 中的类型,这看起来有点难看。
经过一番尝试,我现在最终做到了...
var observable = GetObservable().Publish();
observable.OfType<BrokerEvent>().Subscribe(...);
observable.OfType<IncomingMessage>().Where(x=>x.User == "test").Subscribe(...);
var disposable = observable.Connect();
这似乎也有效,但由于我是响应式扩展的新手,我不太确定这是否有任何不良副作用。我也不确定这是否是将状态消息包含到流中的 "right" 方式。有没有更好的方法来处理这个问题(可能不使用 Publish)或者这是要走的路?
要停止收听,仅处理我从 .Connect() 获得的一次性用品就足够了吗,还是我还必须处理我从 .Subscribe() 获得的两个一次性用品?
谢谢!
我假设 GetObservable
returns IObservable<object>
,这并不理想。
执行上述代码的最佳方法如下:
var observable = GetObservable().Publish();
var sub1 = observable.OfType<BrokerEvent>().Subscribe(_ => { });
var sub2 = observable.OfType<IncomingMessage>().Where(x => x.User == "test").Subscribe(_ => { });
var connectSub = observable.Connect();
var disposable = new CompositeDisposable(sub1, sub2, connectSub);
复合一次性用品将在处置时处置所有子级。
如果两个消息流彼此无关,则该方法可行。但是,由于您基本上有一个控制消息流和一个数据消息流,所以我假设来自一个的消息在处理另一个中的消息时可能很重要。在这种情况下,您可能希望将其视为一个流。
在这种情况下,您可能希望为您的可观察对象创建一个 Discriminated-Union 类型,这可能会使处理更容易。
您可以做的是创建一个 'event handler' class,其中包含 'process message' 的三个重载。一种用于不执行任何操作的对象(默认),一种用于状态类型,一种用于传入消息。在 .Subscribe 中使用此语法
m=>processor.Process((dynamic)m)
这将根据需要调用正确的实现或不执行任何操作。
如果你想在调用 Process 之前进行过滤,你可以引入一个通用的 class(ProcessableMesage 或类似的),或者你可以在你的 OfType 流上调用 .Merge,或者你可以采用相同的方法如上所述,通过动态 MessageFilter class.
我从响应式扩展开始,但我遇到了一个问题,我不确定我是否在正确的轨道上。
我正在使用 Observable 通过 .NET 为事件代理创建和使用侦听器。我创建了一个 "IncomingMessage" class,其中包含来自 eventbroker 的消息,我开始在 Observerable.Create 函数中创建侦听器。效果很好。
现在我还想从侦听器获取状态通知,如 "Connecting..."、"Connected."、"Closing..." 中那样,它们不是 IncomingMessage 所以我创建了一个 class "BrokerEvent" 带有 "Message" 属性 和 "IncomingMessage" 和 "BrokerEvent" 的接口。现在我通过 observer.OnNext(...) 发送它们。到目前为止,这也很有效。
但是在订阅者方面,我现在在过滤我需要的事件时遇到了一些问题。
我愿意:
GetObservable().Where(x => x is BrokerEvent ||
(x is IncomingMessage msg &&
msg.User == "test")).Subscribe(...)
这行得通,但是我需要再次弄清楚 Subscribe 中的类型,这看起来有点难看。
经过一番尝试,我现在最终做到了...
var observable = GetObservable().Publish();
observable.OfType<BrokerEvent>().Subscribe(...);
observable.OfType<IncomingMessage>().Where(x=>x.User == "test").Subscribe(...);
var disposable = observable.Connect();
这似乎也有效,但由于我是响应式扩展的新手,我不太确定这是否有任何不良副作用。我也不确定这是否是将状态消息包含到流中的 "right" 方式。有没有更好的方法来处理这个问题(可能不使用 Publish)或者这是要走的路?
要停止收听,仅处理我从 .Connect() 获得的一次性用品就足够了吗,还是我还必须处理我从 .Subscribe() 获得的两个一次性用品?
谢谢!
我假设 GetObservable
returns IObservable<object>
,这并不理想。
执行上述代码的最佳方法如下:
var observable = GetObservable().Publish();
var sub1 = observable.OfType<BrokerEvent>().Subscribe(_ => { });
var sub2 = observable.OfType<IncomingMessage>().Where(x => x.User == "test").Subscribe(_ => { });
var connectSub = observable.Connect();
var disposable = new CompositeDisposable(sub1, sub2, connectSub);
复合一次性用品将在处置时处置所有子级。
如果两个消息流彼此无关,则该方法可行。但是,由于您基本上有一个控制消息流和一个数据消息流,所以我假设来自一个的消息在处理另一个中的消息时可能很重要。在这种情况下,您可能希望将其视为一个流。
在这种情况下,您可能希望为您的可观察对象创建一个 Discriminated-Union 类型,这可能会使处理更容易。
您可以做的是创建一个 'event handler' class,其中包含 'process message' 的三个重载。一种用于不执行任何操作的对象(默认),一种用于状态类型,一种用于传入消息。在 .Subscribe 中使用此语法
m=>processor.Process((dynamic)m)
这将根据需要调用正确的实现或不执行任何操作。
如果你想在调用 Process 之前进行过滤,你可以引入一个通用的 class(ProcessableMesage 或类似的),或者你可以在你的 OfType 流上调用 .Merge,或者你可以采用相同的方法如上所述,通过动态 MessageFilter class.