将无限异步回调序列转换为 Observable 序列?
Convert infinite async callback sequence to Observable sequence?
假设我有以下基于异步回调的 "infinite" 序列,一段时间后我将其取消:
'use strict';
const timers = require('timers');
let cancelled = false;
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
function cancellableSequence(callback) {
asyncOperation((error, processTime) => {
console.log('Did stuff');
if (!cancelled) {
process.nextTick(() => { cancellableSequence(callback); });
} else {
callback(null, processTime);
}
});
}
cancellableSequence((error, lastProcessTime) => {
console.log('Cancelled');
});
timers.setTimeout(() => { cancelled = true; }, 0);
asyncOperation
会至少执行并回调一次,取消消息不会立即显示,而是在asyncOperation
完成后显示。对 asyncOperation
的调用次数取决于内部 delayMsec
值和最后传递给 setTimeout()
的延迟参数(试图表明这些是可变的)。
我开始学习 RxJS5,并认为可以将其转换为 Observable 序列 ("oooh, an Observable subscription can be unsubscribe()d - that looks neat!")。
然而,我尝试将 cancellableSequence
转变为 ES6 生成器(还有什么方法可以使无限大?)产生 Observable.bindNodeCallback(asyncOperation)()
结果立即产生,这在我的例子中是不希望的行为。
我不能使用 Observable.delay()
或 Observable.timer()
,因为我没有已知的一致间隔。 (asyncOperation
中的 Math.random(...) 试图表明我作为调用者不控制时间,回调发生 "some unknown time later.")
我失败的尝试:
'use strict';
const timers = require('timers');
const Rx = require('rxjs/Rx');
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);
function* generator() {
while (true) {
console.log('Yielding...');
yield operationAsObservable();
}
}
Rx.Observable.from(generator()).take(2).mergeMap(x => x).subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
)
输出的结果是:
Yielding...
Taking 2698msec to process...
Yielding...
Taking 2240msec to process...
Process took: 2240msec
Process took: 2698msec
Complete
产量立即发生。 Process took: xxx
输出在您预期的时间发生(分别在 2240 和 2698 毫秒之后)。
(平心而论,我关心 yield 之间的延迟的原因是 asyncOperation()
实际上是一个限速令牌桶库,它控制异步回调的速率 - 我的一个实现想保留。)
顺便说一句,我试图用延迟取消替换 take(2)
,但从未发生过:
const subscription = Rx.Observable.from(generator()).mergeMap(x => x).subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
)
console.log('Never gets here?');
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.unsubscribe();
}, 0);
可以通过 RxJS 取消订阅来完成我正在尝试的事情吗? (我可以看到其他方法,例如 process.exec('node', ...)
到 运行 asyncOperation()
作为一个单独的过程,使我能够 process.kill(..)
等,但我们不要去那里。 ..).
我最初基于回调的实现是实现可取消序列的建议方法吗?
更新的解决方案:
在下面查看我对@user3743222 的回答的回复评论。这是我最终得到的结果(用 Observable.expand()
替换 ES6 生成器):
'use strict';
const timers = require('timers');
const Rx = require('rxjs/Rx');
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);
const subscription = Rx.Observable
.defer(operationAsObservable)
.expand(x => operationAsObservable())
.subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
);
subscription.add(() => {
console.log('Cancelled');
});
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.unsubscribe();
}, 0);
更新的解决方案 2:
这是我为替代 RxJS4 repeatWhen()
方法想出的:
'use strict';
const timers = require('timers');
const Rx = require('rx');
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 1000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const operationAsObservable = Rx.Observable.fromNodeCallback(asyncOperation);
const subscription = Rx.Observable
.defer(operationAsObservable)
.repeatWhen(x => x.takeWhile(y => true))
.subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
);
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.dispose();
}, 10000);
您似乎在每次完成时都在重复一个动作。这看起来是 expand
或 repeatWhen
.
的一个很好的用例
通常,这类似于:
Rx.Observable.just(false).expand(_ => {
return cancelled ? Rx.Observable.empty() : Rx.Observable.fromCallback(asyncAction)
})
您在任何时间点将 cancelled
设置为真,当当前操作完成时,它会停止循环。还没有测试过,所以我很想知道最后是否有效。
你可以看看关于轮询的类似问题:
- How to build an rx poller that waits some interval AFTER the previous ajax promise resolves?
文档:
文档链接适用于 Rxjs 4,但与 v5 相比应该没有太大变化
假设我有以下基于异步回调的 "infinite" 序列,一段时间后我将其取消:
'use strict';
const timers = require('timers');
let cancelled = false;
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
function cancellableSequence(callback) {
asyncOperation((error, processTime) => {
console.log('Did stuff');
if (!cancelled) {
process.nextTick(() => { cancellableSequence(callback); });
} else {
callback(null, processTime);
}
});
}
cancellableSequence((error, lastProcessTime) => {
console.log('Cancelled');
});
timers.setTimeout(() => { cancelled = true; }, 0);
asyncOperation
会至少执行并回调一次,取消消息不会立即显示,而是在asyncOperation
完成后显示。对 asyncOperation
的调用次数取决于内部 delayMsec
值和最后传递给 setTimeout()
的延迟参数(试图表明这些是可变的)。
我开始学习 RxJS5,并认为可以将其转换为 Observable 序列 ("oooh, an Observable subscription can be unsubscribe()d - that looks neat!")。
然而,我尝试将 cancellableSequence
转变为 ES6 生成器(还有什么方法可以使无限大?)产生 Observable.bindNodeCallback(asyncOperation)()
结果立即产生,这在我的例子中是不希望的行为。
我不能使用 Observable.delay()
或 Observable.timer()
,因为我没有已知的一致间隔。 (asyncOperation
中的 Math.random(...) 试图表明我作为调用者不控制时间,回调发生 "some unknown time later.")
我失败的尝试:
'use strict';
const timers = require('timers');
const Rx = require('rxjs/Rx');
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);
function* generator() {
while (true) {
console.log('Yielding...');
yield operationAsObservable();
}
}
Rx.Observable.from(generator()).take(2).mergeMap(x => x).subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
)
输出的结果是:
Yielding...
Taking 2698msec to process...
Yielding...
Taking 2240msec to process...
Process took: 2240msec
Process took: 2698msec
Complete
产量立即发生。 Process took: xxx
输出在您预期的时间发生(分别在 2240 和 2698 毫秒之后)。
(平心而论,我关心 yield 之间的延迟的原因是 asyncOperation()
实际上是一个限速令牌桶库,它控制异步回调的速率 - 我的一个实现想保留。)
顺便说一句,我试图用延迟取消替换 take(2)
,但从未发生过:
const subscription = Rx.Observable.from(generator()).mergeMap(x => x).subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
)
console.log('Never gets here?');
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.unsubscribe();
}, 0);
可以通过 RxJS 取消订阅来完成我正在尝试的事情吗? (我可以看到其他方法,例如 process.exec('node', ...)
到 运行 asyncOperation()
作为一个单独的过程,使我能够 process.kill(..)
等,但我们不要去那里。 ..).
我最初基于回调的实现是实现可取消序列的建议方法吗?
更新的解决方案:
在下面查看我对@user3743222 的回答的回复评论。这是我最终得到的结果(用 Observable.expand()
替换 ES6 生成器):
'use strict';
const timers = require('timers');
const Rx = require('rxjs/Rx');
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 10000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const operationAsObservable = Rx.Observable.bindNodeCallback(asyncOperation);
const subscription = Rx.Observable
.defer(operationAsObservable)
.expand(x => operationAsObservable())
.subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
);
subscription.add(() => {
console.log('Cancelled');
});
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.unsubscribe();
}, 0);
更新的解决方案 2:
这是我为替代 RxJS4 repeatWhen()
方法想出的:
'use strict';
const timers = require('timers');
const Rx = require('rx');
function asyncOperation(callback) {
const delayMsec = Math.floor(Math.random() * 1000) + 1;
console.log(`Taking ${delayMsec}msec to process...`);
timers.setTimeout(callback, delayMsec, null, delayMsec);
}
const operationAsObservable = Rx.Observable.fromNodeCallback(asyncOperation);
const subscription = Rx.Observable
.defer(operationAsObservable)
.repeatWhen(x => x.takeWhile(y => true))
.subscribe(
x => console.log(`Process took: ${x}msec`),
e => console.log(`Error: ${e}`),
c => console.log('Complete')
);
timers.setTimeout(() => {
console.log('Cancelling...');
subscription.dispose();
}, 10000);
您似乎在每次完成时都在重复一个动作。这看起来是 expand
或 repeatWhen
.
通常,这类似于:
Rx.Observable.just(false).expand(_ => {
return cancelled ? Rx.Observable.empty() : Rx.Observable.fromCallback(asyncAction)
})
您在任何时间点将 cancelled
设置为真,当当前操作完成时,它会停止循环。还没有测试过,所以我很想知道最后是否有效。
你可以看看关于轮询的类似问题:
- How to build an rx poller that waits some interval AFTER the previous ajax promise resolves?
文档:
文档链接适用于 Rxjs 4,但与 v5 相比应该没有太大变化