将无限异步回调序列转换为 Observable 序列?

Convert infinite async callback sequence to Observable sequence?

假设我有以下基于异步回调的 "infinite" 序列,一段时间后我将其取消:

'use strict';

const timers = require('timers');

let cancelled = false;

function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 10000) + 1;
  console.log(`Taking ${delayMsec}msec to process...`);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

function cancellableSequence(callback) {
  asyncOperation((error, processTime) => {
    console.log('Did stuff');
    if (!cancelled) {
      process.nextTick(() => { cancellableSequence(callback); });
    } else {
      callback(null, processTime);
    }
  });
}

cancellableSequence((error, lastProcessTime) => {
  console.log('Cancelled');
});

timers.setTimeout(() => { cancelled = true; }, 0);

asyncOperation会至少执行并回调一次,取消消息不会立即显示,而是在asyncOperation完成后显示。对 asyncOperation 的调用次数取决于内部 delayMsec 值和最后传递给 setTimeout() 的延迟参数(试图表明这些是可变的)。

我开始学习 RxJS5,并认为可以将其转换为 Observable 序列 ("oooh, an Observable subscription can be unsubscribe()d - that looks neat!")。

然而,我尝试将 cancellableSequence 转变为 ES6 生成器(还有什么方法可以使无限大?)产生 Observable.bindNodeCallback(asyncOperation)() 结果立即产生,这在我的例子中是不希望的行为。

我不能使用 Observable.delay()Observable.timer(),因为我没有已知的一致间隔。 (asyncOperation 中的 Math.random(...) 试图表明我作为调用者不控制时间,回调发生 "some unknown time later.")

我失败的尝试:

'use strict';

const timers = require('timers');
const Rx = require('rxjs/Rx');

function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 10000) + 1;
  console.log(`Taking ${delayMsec}msec to process...`);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);
function* generator() {
  while (true) {
    console.log('Yielding...');
    yield operationAsObservable();
  }
}

Rx.Observable.from(generator()).take(2).mergeMap(x => x).subscribe(
  x => console.log(`Process took: ${x}msec`),
  e => console.log(`Error: ${e}`),
  c => console.log('Complete')
)

输出的结果是:

Yielding...
Taking 2698msec to process...
Yielding...
Taking 2240msec to process...
Process took: 2240msec
Process took: 2698msec
Complete

产量立即发生。 Process took: xxx 输出在您预期的时间发生(分别在 2240 和 2698 毫秒之后)。

(平心而论,我关心 yield 之间的延迟的原因是 asyncOperation() 实际上是一个限速令牌桶库,它控制异步回调的速率 - 我的一个实现想保留。)

顺便说一句,我试图用延迟取消替换 take(2),但从未发生过:

const subscription = Rx.Observable.from(generator()).mergeMap(x => x).subscribe(
  x => console.log(`Process took: ${x}msec`),
  e => console.log(`Error: ${e}`),
  c => console.log('Complete')
)

console.log('Never gets here?');
timers.setTimeout(() => {
  console.log('Cancelling...');
  subscription.unsubscribe();
}, 0);

可以通过 RxJS 取消订阅来完成我正在尝试的事情吗? (我可以看到其他方法,例如 process.exec('node', ...) 到 运行 asyncOperation() 作为一个单独的过程,使我能够 process.kill(..) 等,但我们不要去那里。 ..).

我最初基于回调的实现是实现可取消序列的建议方法吗?

更新的解决方案:

在下面查看我对@user3743222 的回答的回复评论。这是我最终得到的结果(用 Observable.expand() 替换 ES6 生成器):

'use strict';

const timers = require('timers');
const Rx = require('rxjs/Rx');

function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 10000) + 1;
  console.log(`Taking ${delayMsec}msec to process...`);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);

const subscription = Rx.Observable
  .defer(operationAsObservable)
  .expand(x => operationAsObservable())
  .subscribe(
    x => console.log(`Process took: ${x}msec`),
    e => console.log(`Error: ${e}`),
    c => console.log('Complete')
  );

subscription.add(() => {
  console.log('Cancelled');
});

timers.setTimeout(() => {
  console.log('Cancelling...');
  subscription.unsubscribe();
}, 0);

更新的解决方案 2:

这是我为替代 RxJS4 repeatWhen() 方法想出的:

'use strict';

const timers = require('timers');
const Rx = require('rx');

function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 1000) + 1;
  console.log(`Taking ${delayMsec}msec to process...`);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const operationAsObservable = Rx.Observable.fromNodeCallback(asyncOperation);

const subscription = Rx.Observable
  .defer(operationAsObservable)
  .repeatWhen(x => x.takeWhile(y => true))
  .subscribe(
    x => console.log(`Process took: ${x}msec`),
    e => console.log(`Error: ${e}`),
    c => console.log('Complete')
  );

timers.setTimeout(() => {
  console.log('Cancelling...');
  subscription.dispose();
}, 10000);

您似乎在每次完成时都在重复一个动作。这看起来是 expandrepeatWhen.

的一个很好的用例

通常,这类似于:

Rx.Observable.just(false).expand(_ => {  
  return cancelled ? Rx.Observable.empty() : Rx.Observable.fromCallback(asyncAction)
})

您在任何时间点将 cancelled 设置为真,当当前操作完成时,它会停止循环。还没有测试过,所以我很想知道最后是否有效。

你可以看看关于轮询的类似问题:

  • How to build an rx poller that waits some interval AFTER the previous ajax promise resolves?

文档:

文档链接适用于 Rxjs 4,但与 v5 相比应该没有太大变化