如何在反应式扩展中创建 Hot observable
How to create Hot observable in reactive extension
我正在尝试将 Reactive 扩展与 Oracle AQ 结合使用。当一条消息出现在 Oracle Queue 上时,它会触发一个 "OracleAQMessageAvailableEvent",告诉消费者有一条消息。在 OracleAQMessageAvailableEventHandler 内部,消费者调用 OracleAQQueue.Dequeue() 来检索消息。
我已经在 RX 上完成了上述工作。以下是我使用的代码。
var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
});
messages.subscribe(....)
问题是,如果我在一切正常时订阅消息,但如果我多次订阅消息(即我的应用程序中有多个消费者),那么每个消费者都会尝试调用“_queue.Dequeue()”,并且每个如果我们没有新消息,第一次呼叫后的呼叫将失败。
谁能指导我该怎么做。我认为,我的方案是针对 Hot Observable,但我正在努力解决这个问题。
我认为您在寻找 Hot Observable 是正确的。
如果我们遵循代码,您会看到 _queue.Dequeue();
被多次调用的原因可能会更清楚。
首先您从 Oracle 订阅了事件
Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h,
h => _queue.MessageAvailable -= h)
这就像在 Rx 之前的世界中连接事件处理程序一样。
每个收听(订阅)的人都会收到相同的事件。
如果他们在事件引发后订阅,那么他们就错过了。
然后你过滤掉空集。
.Where(x => x.EventArgs.AvailableMessages > 0)
没什么特别的。
然后从查询内部执行副作用。
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
});
这里的副作用是您正在进行破坏性读取 (Dequeue
)。
所有订阅者在从上游 _queue.MessageAvailable
推送事件时都会尝试调用 Dequeue()
.
为避免所有订阅者调用副作用,您可以使序列成为热序列(如您所建议的)。
为此,您可以查看 Publish()
运算符。
Publish()
运算符将为您 return 一个 IConnectableObservable<T>
,它只是通过添加 Connect()
方法来扩展 IObservable<T>
。
这允许对何时执行订阅逻辑进行细粒度控制。
但是,这对您来说可能控制太多,您可能会发现 RefCount()
正是您所需要的。
Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h,
h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
})
.Publish()
.Refcount();
现在您的每个订阅者都会收到相同的消息,并且您的 Dequeue()
副作用在每个事件中只会被调用一次(并且只有在有订阅者的情况下)。
涵盖了热和冷 observable here
Lee Campbell,对不起我的错。您提到的解决方案确实有效。其实我用错了。我有一个 class 调用 QueueWrapper,它有一个 属性 调用消息。我有这个 Messages
的实现
public IObservable<UpdateMsg> Messages {
get { return Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h,
h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
})
.Publish()
.Refcount();
}}
并且我的客户端代码使用这样的消息 属性 进行订阅
// First Subscription
_queueWrapper.Messages.Subscribe(....)
// Second Subscription
_queueWrapper.Messages.Subscribe(....)
因此对于每个订阅,Messages 属性 是 returning 一个新的 IObservable。为了解决这个问题,我将 observable 的初始化移到了 QueueWrapper 的构造函数中,即以下代码:
public QueueWrapper() {
_messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h,
h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
})
.Publish()
.Refcount();
}
我的消息 属性 只是 return _messages;
public IObservable<UpdateMsg> Messages { get { return _messages; } }
之后一切都按预期开始工作。
我正在尝试将 Reactive 扩展与 Oracle AQ 结合使用。当一条消息出现在 Oracle Queue 上时,它会触发一个 "OracleAQMessageAvailableEvent",告诉消费者有一条消息。在 OracleAQMessageAvailableEventHandler 内部,消费者调用 OracleAQQueue.Dequeue() 来检索消息。
我已经在 RX 上完成了上述工作。以下是我使用的代码。
var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
});
messages.subscribe(....)
问题是,如果我在一切正常时订阅消息,但如果我多次订阅消息(即我的应用程序中有多个消费者),那么每个消费者都会尝试调用“_queue.Dequeue()”,并且每个如果我们没有新消息,第一次呼叫后的呼叫将失败。
谁能指导我该怎么做。我认为,我的方案是针对 Hot Observable,但我正在努力解决这个问题。
我认为您在寻找 Hot Observable 是正确的。
如果我们遵循代码,您会看到 _queue.Dequeue();
被多次调用的原因可能会更清楚。
首先您从 Oracle 订阅了事件
Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h,
h => _queue.MessageAvailable -= h)
这就像在 Rx 之前的世界中连接事件处理程序一样。 每个收听(订阅)的人都会收到相同的事件。 如果他们在事件引发后订阅,那么他们就错过了。
然后你过滤掉空集。
.Where(x => x.EventArgs.AvailableMessages > 0)
没什么特别的。
然后从查询内部执行副作用。
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
});
这里的副作用是您正在进行破坏性读取 (Dequeue
)。
所有订阅者在从上游 _queue.MessageAvailable
推送事件时都会尝试调用 Dequeue()
.
为避免所有订阅者调用副作用,您可以使序列成为热序列(如您所建议的)。
为此,您可以查看 Publish()
运算符。
Publish()
运算符将为您 return 一个 IConnectableObservable<T>
,它只是通过添加 Connect()
方法来扩展 IObservable<T>
。
这允许对何时执行订阅逻辑进行细粒度控制。
但是,这对您来说可能控制太多,您可能会发现 RefCount()
正是您所需要的。
Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h,
h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
})
.Publish()
.Refcount();
现在您的每个订阅者都会收到相同的消息,并且您的 Dequeue()
副作用在每个事件中只会被调用一次(并且只有在有订阅者的情况下)。
涵盖了热和冷 observable here
Lee Campbell,对不起我的错。您提到的解决方案确实有效。其实我用错了。我有一个 class 调用 QueueWrapper,它有一个 属性 调用消息。我有这个 Messages
的实现 public IObservable<UpdateMsg> Messages {
get { return Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h,
h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
})
.Publish()
.Refcount();
}}
并且我的客户端代码使用这样的消息 属性 进行订阅
// First Subscription
_queueWrapper.Messages.Subscribe(....)
// Second Subscription
_queueWrapper.Messages.Subscribe(....)
因此对于每个订阅,Messages 属性 是 returning 一个新的 IObservable。为了解决这个问题,我将 observable 的初始化移到了 QueueWrapper 的构造函数中,即以下代码:
public QueueWrapper() {
_messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h,
h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
})
.Publish()
.Refcount();
}
我的消息 属性 只是 return _messages;
public IObservable<UpdateMsg> Messages { get { return _messages; } }
之后一切都按预期开始工作。