如何在反应式扩展中创建 Hot observable

How to create Hot observable in reactive extension

我正在尝试将 Reactive 扩展与 Oracle AQ 结合使用。当一条消息出现在 Oracle Queue 上时,它会触发一个 "OracleAQMessageAvailableEvent",告诉消费者有一条消息。在 OracleAQMessageAvailableEventHandler 内部,消费者调用 OracleAQQueue.Dequeue() 来检索消息。

我已经在 RX 上完成了上述工作。以下是我使用的代码。

var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
                    h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
                .Where(x => x.EventArgs.AvailableMessages > 0)
                .Select(x =>
                {
                    OracleAQMessage msg = _queue.Dequeue();
                    return (UpdateMsg) msg.Payload;
                });
messages.subscribe(....)

问题是,如果我在一切正常时订阅消息,但如果我多次订阅消息(即我的应用程序中有多个消费者),那么每个消费者都会尝试调用“_queue.Dequeue()”,并且每个如果我们没有新消息,第一次呼叫后的呼叫将失败。

谁能指导我该怎么做。我认为,我的方案是针对 Hot Observable,但我正在努力解决这个问题。

我认为您在寻找 Hot Observable 是正确的。 如果我们遵循代码,您会看到 _queue.Dequeue(); 被多次调用的原因可能会更清楚。

首先您从 Oracle 订阅了事件

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h)

这就像在 Rx 之前的世界中连接事件处理程序一样。 每个收听(订阅)的人都会收到相同的事件。 如果他们在事件引发后订阅,那么他们就错过了。

然后你过滤掉空集。

.Where(x => x.EventArgs.AvailableMessages > 0)

没什么特别的。

然后从查询内部执行副作用。

.Select(x =>
    {
        OracleAQMessage msg = _queue.Dequeue();
        return (UpdateMsg) msg.Payload;
    });

这里的副作用是您正在进行破坏性读取 (Dequeue)。 所有订阅者在从上游 _queue.MessageAvailable 推送事件时都会尝试调用 Dequeue().

为避免所有订阅者调用副作用,您可以使序列成为热序列(如您所建议的)。 为此,您可以查看 Publish() 运算符。

Publish() 运算符将为您 return 一个 IConnectableObservable<T>,它只是通过添加 Connect() 方法来扩展 IObservable<T>。 这允许对何时执行订阅逻辑进行细粒度控制。 但是,这对您来说可能控制太多,您可能会发现 RefCount() 正是您所需要的。

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
    {
        OracleAQMessage msg = _queue.Dequeue();
        return (UpdateMsg) msg.Payload;
    })
.Publish()
.Refcount();

现在您的每个订阅者都会收到相同的消息,并且您的 Dequeue() 副作用在每个事件中只会被调用一次(并且只有在有订阅者的情况下)。

涵盖了热和冷 observable here

Lee Campbell,对不起我的错。您提到的解决方案确实有效。其实我用错了。我有一个 class 调用 QueueWrapper,它有一个 属性 调用消息。我有这个 Messages

的实现
    public IObservable<UpdateMsg> Messages { 
        get { return Observable.FromEventPattern<OracleAQMessageAvailableEventHandler,         OracleAQMessageAvailableEventArgs> (
        h => _queue.MessageAvailable += h, 
        h => _queue.MessageAvailable -= h)
       .Where(x => x.EventArgs.AvailableMessages > 0)
       .Select(x =>
        {
            OracleAQMessage msg = _queue.Dequeue();
            return (UpdateMsg) msg.Payload;
        })
       .Publish()
       .Refcount();
}}

并且我的客户端代码使用这样的消息 属性 进行订阅

// First Subscription
_queueWrapper.Messages.Subscribe(....)

// Second Subscription
_queueWrapper.Messages.Subscribe(....)

因此对于每个订阅,Messages 属性 是 returning 一个新的 IObservable。为了解决这个问题,我将 observable 的初始化移到了 QueueWrapper 的构造函数中,即以下代码:

    public QueueWrapper() {
     _messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
        h => _queue.MessageAvailable += h, 
        h => _queue.MessageAvailable -= h)
    .Where(x => x.EventArgs.AvailableMessages > 0)
    .Select(x =>
        {
            OracleAQMessage msg = _queue.Dequeue();
            return (UpdateMsg) msg.Payload;
        })
    .Publish()
    .Refcount();
}

我的消息 属性 只是 return _messages;

public IObservable<UpdateMsg> Messages { get { return _messages; } }

之后一切都按预期开始工作。