Rx.js 等待回调完成
Rx.js wait for callback to complete
我正在使用 Rx.js 来处理文件的内容,为每一行发出一个 http 请求,然后聚合结果。但是,源文件包含数千行,我正在超载我正在执行 http 请求的远程 http api。我需要确保在开始另一个请求之前等待现有的 http 请求回调。我愿意一次批处理和执行 n
请求,但对于此脚本,串行执行请求就足够了。
我有以下内容:
const fs = require('fs');
const rx = require('rx');
const rxNode = require('rx-node');
const doHttpRequest = rx.Observable.fromCallback((params, callback) => {
process.nextTick(() => {
callback('http response');
});
});
rxNode.fromReadableStream(fs.createReadStream('./source-file.txt'))
.flatMap(t => t.toString().split('\r\n'))
.take(5)
.concatMap(t => {
console.log('Submitting request');
return doHttpRequest(t);
})
.subscribe(results => {
console.log(results);
}, err => {
console.error('Error', err);
}, () => {
console.log('Completed');
});
但是,这不会连续执行 http 请求。它输出:
Submitting request
Submitting request
Submitting request
Submitting request
Submitting request
http response
http response
http response
http response
http response
Completed
如果我删除对 concatAll()
的调用,则请求是串行的,但我的订阅函数在 http 请求返回之前看到了可观察值。
如何串行执行 HTTP 请求,以便输出如下?
Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Completed
这里的问题可能是当你使用rx.Observable.fromCallback
时,你传入参数的函数被立即执行。 observable returned 将保存稍后传递给回调的值。为了更好地了解正在发生的事情,您应该使用稍微复杂一些的模拟:为您的请求编号,让它们 return 一个您可以通过订阅观察到的实际(每个请求不同)结果。
我在这里发生的事情:
take(5)
发布 5 个值
map
发出 5 条日志消息,执行 5 个函数并传递 5 个可观察值
- 这 5 个 observables 由
concatAll
处理,这些 observables 发出的值将按预期顺序排列。您在这里订购的是调用函数的结果,而不是调用函数本身。
为了实现您的目标,您需要仅在 concatAll
订阅时调用您的可观察工厂 (rx.Observable.fromCallback
),而不是在创建时。为此,您可以使用 defer
:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/defer.md
所以你的代码会变成:
rxNode.fromReadableStream(fs.createReadStream('./path-to-file'))
.map(t => t.toString().split('\r\n'))
.flatMap(t => t)
.take(5)
.map(t => {
console.log('Submitting request');
return Observable.defer(function(){return doHttpRequest(t);})
})
.concatAll()
.subscribe(results => {
console.log(results);
}, err => {
console.error('Error', err);
}, () => {
console.log('Completed');
});
你可以在这里看到一个类似的问题,并有很好的解释:
您的日志可能仍会显示 5 条连续的 'Submitting request' 消息。但是您的请求应该按照您的意愿一个接一个地执行。
我正在使用 Rx.js 来处理文件的内容,为每一行发出一个 http 请求,然后聚合结果。但是,源文件包含数千行,我正在超载我正在执行 http 请求的远程 http api。我需要确保在开始另一个请求之前等待现有的 http 请求回调。我愿意一次批处理和执行 n
请求,但对于此脚本,串行执行请求就足够了。
我有以下内容:
const fs = require('fs');
const rx = require('rx');
const rxNode = require('rx-node');
const doHttpRequest = rx.Observable.fromCallback((params, callback) => {
process.nextTick(() => {
callback('http response');
});
});
rxNode.fromReadableStream(fs.createReadStream('./source-file.txt'))
.flatMap(t => t.toString().split('\r\n'))
.take(5)
.concatMap(t => {
console.log('Submitting request');
return doHttpRequest(t);
})
.subscribe(results => {
console.log(results);
}, err => {
console.error('Error', err);
}, () => {
console.log('Completed');
});
但是,这不会连续执行 http 请求。它输出:
Submitting request Submitting request Submitting request Submitting request Submitting request http response http response http response http response http response Completed
如果我删除对 concatAll()
的调用,则请求是串行的,但我的订阅函数在 http 请求返回之前看到了可观察值。
如何串行执行 HTTP 请求,以便输出如下?
Submitting request http response Submitting request http response Submitting request http response Submitting request http response Submitting request http response Completed
这里的问题可能是当你使用rx.Observable.fromCallback
时,你传入参数的函数被立即执行。 observable returned 将保存稍后传递给回调的值。为了更好地了解正在发生的事情,您应该使用稍微复杂一些的模拟:为您的请求编号,让它们 return 一个您可以通过订阅观察到的实际(每个请求不同)结果。
我在这里发生的事情:
take(5)
发布 5 个值map
发出 5 条日志消息,执行 5 个函数并传递 5 个可观察值- 这 5 个 observables 由
concatAll
处理,这些 observables 发出的值将按预期顺序排列。您在这里订购的是调用函数的结果,而不是调用函数本身。
为了实现您的目标,您需要仅在 concatAll
订阅时调用您的可观察工厂 (rx.Observable.fromCallback
),而不是在创建时。为此,您可以使用 defer
:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/defer.md
所以你的代码会变成:
rxNode.fromReadableStream(fs.createReadStream('./path-to-file'))
.map(t => t.toString().split('\r\n'))
.flatMap(t => t)
.take(5)
.map(t => {
console.log('Submitting request');
return Observable.defer(function(){return doHttpRequest(t);})
})
.concatAll()
.subscribe(results => {
console.log(results);
}, err => {
console.error('Error', err);
}, () => {
console.log('Completed');
});
你可以在这里看到一个类似的问题,并有很好的解释:
您的日志可能仍会显示 5 条连续的 'Submitting request' 消息。但是您的请求应该按照您的意愿一个接一个地执行。