如何延迟读取带有 node.js 或 javascript 的文件行,而不是非阻塞行为?
How to read lines of a file with node.js or javascript with delay, not in non-blocking behavior?
我正在阅读 node.js 中的一个文件(300,000 行)。我想以 5,000 行为一组将行发送到另一个应用程序 (Elasticsearch) 以存储它们。因此,每当我读完 5,000 行时,我想通过 API 将它们批量发送到 Elasticsearch 以存储它们,然后继续读取文件的其余部分并批量发送每 5,000 行。
如果我想使用 java(或任何其他阻塞语言,如 C、C++、python 等)来完成此任务,我将执行如下操作:
int countLines = 0;
String bulkString = "";
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("filePath.txt")));
while ((currentLine = br.readLine()) != null) {
countLines++;
bulkString += currentLine;
if(countLines >= 5000){
//send bulkString to Elasticsearch via APIs
countLines = 0;
bulkString = "";
}
}
如果我想用 node.js 做同样的事情,我会做:
var countLines = 0;
var bulkString = "";
var instream = fs.createReadStream('filePath.txt');
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
if(countLines >= 5000){
//send bulkString to via APIs
client.bulk({
index: 'indexName',
type: 'type',
body: [bulkString]
}, function (error, response) {
//task is done
});
countLines = 0;
bulkString = "";
}
}
与 node.js 的问题是它是非阻塞的,因此它不会在发送下一批之前等待第一个 API 响应线。我知道这可以算作 done.js 的一个好处,因为它不等待 I/O,但问题是它向 Elasticsearch 发送了太多数据。因此 Elasticsearch 的队列将变满并抛出异常。
我的问题 是如何让 node.js 在继续阅读下一行之前或之前等待 API 的响应它将下一批行发送到 Elasticsearch。
我知道我可以在 Elasticsearch 中设置一些参数来增加队列大小,但我对 node.js 针对此问题的阻止行为感兴趣。我熟悉回调的概念,但我想不出在这种情况下使用 callbacks 来防止 node.js 在非-阻塞模式。
在你的 if 之后使用 rl.pause()
,在你的 //task is done
之后使用 rl.resume()
。
请注意,您可能会在调用 pause 后多发生一些行事件。
皮埃尔的回答是正确的。我只想提交一段代码,展示我们如何从 node.js 的非阻塞概念中获益,但同时,不要一次用太多请求压垮 Elasticsearch。
这是一个伪代码,您可以使用它通过设置队列大小限制来为代码提供灵活性:
var countLines = 0;
var bulkString = "";
var queueSize = 3;//maximum of 3 requests will be sent to the Elasticsearch server
var batchesAlreadyInQueue = 0;
var instream = fs.createReadStream('filePath.txt');
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
if(countLines >= 5000){
//send bulkString to via APIs
client.bulk({
index: 'indexName',
type: 'type',
body: [bulkString]
}, function (error, response) {
//task is done
batchesAlreadyInQueue--;//we will decrease a number of requests that are already sent to the Elasticsearch when we hear back from one of the requests
rl.resume();
});
if(batchesAlreadyInQueue >= queueSize){
rl.pause();
}
countLines = 0;
bulkString = "";
}
}
我正在阅读 node.js 中的一个文件(300,000 行)。我想以 5,000 行为一组将行发送到另一个应用程序 (Elasticsearch) 以存储它们。因此,每当我读完 5,000 行时,我想通过 API 将它们批量发送到 Elasticsearch 以存储它们,然后继续读取文件的其余部分并批量发送每 5,000 行。
如果我想使用 java(或任何其他阻塞语言,如 C、C++、python 等)来完成此任务,我将执行如下操作:
int countLines = 0;
String bulkString = "";
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("filePath.txt")));
while ((currentLine = br.readLine()) != null) {
countLines++;
bulkString += currentLine;
if(countLines >= 5000){
//send bulkString to Elasticsearch via APIs
countLines = 0;
bulkString = "";
}
}
如果我想用 node.js 做同样的事情,我会做:
var countLines = 0;
var bulkString = "";
var instream = fs.createReadStream('filePath.txt');
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
if(countLines >= 5000){
//send bulkString to via APIs
client.bulk({
index: 'indexName',
type: 'type',
body: [bulkString]
}, function (error, response) {
//task is done
});
countLines = 0;
bulkString = "";
}
}
与 node.js 的问题是它是非阻塞的,因此它不会在发送下一批之前等待第一个 API 响应线。我知道这可以算作 done.js 的一个好处,因为它不等待 I/O,但问题是它向 Elasticsearch 发送了太多数据。因此 Elasticsearch 的队列将变满并抛出异常。
我的问题 是如何让 node.js 在继续阅读下一行之前或之前等待 API 的响应它将下一批行发送到 Elasticsearch。
我知道我可以在 Elasticsearch 中设置一些参数来增加队列大小,但我对 node.js 针对此问题的阻止行为感兴趣。我熟悉回调的概念,但我想不出在这种情况下使用 callbacks 来防止 node.js 在非-阻塞模式。
在你的 if 之后使用 rl.pause()
,在你的 //task is done
之后使用 rl.resume()
。
请注意,您可能会在调用 pause 后多发生一些行事件。
皮埃尔的回答是正确的。我只想提交一段代码,展示我们如何从 node.js 的非阻塞概念中获益,但同时,不要一次用太多请求压垮 Elasticsearch。
这是一个伪代码,您可以使用它通过设置队列大小限制来为代码提供灵活性:
var countLines = 0;
var bulkString = "";
var queueSize = 3;//maximum of 3 requests will be sent to the Elasticsearch server
var batchesAlreadyInQueue = 0;
var instream = fs.createReadStream('filePath.txt');
var rl = readline.createInterface(instream, outstream);
rl.on('line', function(line) {
if(countLines >= 5000){
//send bulkString to via APIs
client.bulk({
index: 'indexName',
type: 'type',
body: [bulkString]
}, function (error, response) {
//task is done
batchesAlreadyInQueue--;//we will decrease a number of requests that are already sent to the Elasticsearch when we hear back from one of the requests
rl.resume();
});
if(batchesAlreadyInQueue >= queueSize){
rl.pause();
}
countLines = 0;
bulkString = "";
}
}