具有异步订阅者功能的 RxJS Observable
RxJS Observable with asynchronous subscriber function
我正在尝试做一些感觉应该很简单的事情,但事实证明却出奇地困难。
我有一个订阅 RabbitMQ 队列的函数。具体来说,这是这里的 Channel.consume 函数:http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
它 return 是一个使用订阅 ID 解决的承诺 - 稍后需要取消订阅 - 并且还有一个回调参数在消息从队列中拉出时调用。
当我想取消订阅队列时,我需要使用此处的 Channel.cancel 函数取消消费者:http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel。这需要之前 returned 的订阅 ID。
我想将所有这些东西包装在一个 Observable 中,该 Observable 在订阅 observable 时订阅队列,并在取消订阅 observable 时取消订阅。然而,由于调用的 'double-asynchronous' 性质,这被证明有些困难(我的意思是说它们既有回调又有 return 承诺)。
理想情况下,我希望能够编写的代码是:
return new Rx.Observable(async (subscriber) => {
var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
return async () => {
await channel.cancel(consumeResult.consumerTag);
};
});
但是,这是不可能的,因为此构造函数不支持异步订阅者函数或拆卸逻辑。
我一直想不通。我在这里错过了什么吗?为什么这么难?
干杯,
亚历克斯
创建的可观察对象不需要等待 channel.consume
promise 解析,因为观察者(它是传递的观察者,而不是订阅者)仅在您提供的函数中被调用。
但是,您 return 的退订功能必须等待该承诺解决。它可以在内部做到这一点,就像这样:
return new Rx.Observable((observer) => {
var consumeResult = channel.consume(queueName, (message) => observer.next(message));
return () => {
consumeResult.then(() => channel.cancel(consumeResult.consumerTag));
};
});
我正在尝试做一些感觉应该很简单的事情,但事实证明却出奇地困难。
我有一个订阅 RabbitMQ 队列的函数。具体来说,这是这里的 Channel.consume 函数:http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
它 return 是一个使用订阅 ID 解决的承诺 - 稍后需要取消订阅 - 并且还有一个回调参数在消息从队列中拉出时调用。
当我想取消订阅队列时,我需要使用此处的 Channel.cancel 函数取消消费者:http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel。这需要之前 returned 的订阅 ID。
我想将所有这些东西包装在一个 Observable 中,该 Observable 在订阅 observable 时订阅队列,并在取消订阅 observable 时取消订阅。然而,由于调用的 'double-asynchronous' 性质,这被证明有些困难(我的意思是说它们既有回调又有 return 承诺)。
理想情况下,我希望能够编写的代码是:
return new Rx.Observable(async (subscriber) => {
var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
return async () => {
await channel.cancel(consumeResult.consumerTag);
};
});
但是,这是不可能的,因为此构造函数不支持异步订阅者函数或拆卸逻辑。
我一直想不通。我在这里错过了什么吗?为什么这么难?
干杯, 亚历克斯
创建的可观察对象不需要等待 channel.consume
promise 解析,因为观察者(它是传递的观察者,而不是订阅者)仅在您提供的函数中被调用。
但是,您 return 的退订功能必须等待该承诺解决。它可以在内部做到这一点,就像这样:
return new Rx.Observable((observer) => {
var consumeResult = channel.consume(queueName, (message) => observer.next(message));
return () => {
consumeResult.then(() => channel.cancel(consumeResult.consumerTag));
};
});