如何使用 Node 将 Geonames 1.5Gib 文件正确插入到 Postgresql 中?
How to properly insert the Geonames 1.5Gib file into Postgresql using Node?
我已经下载了 Geonames 数据库转储,我正在尝试将所有内容放入 postgresql table,但无论我尝试什么,我都会 运行 出现多个错误。
我最后一次修改得到以下内容:
Error: Connection terminated by user
at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:402:36)
at Pool._remove (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:135:12)
at Timeout.setTimeout (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:38:12)
at ontimeout (timers.js:498:11)
at tryOnTimeout (timers.js:323:5)
at Timer.listOnTimeout (timers.js:290:5)
Line added 6052 0.05135667935111022%
(node:31819) UnhandledPromiseRejectionWarning: Error: This socket is closed
at Socket._writeGeneric (net.js:729:18)
at Socket._write (net.js:783:8)
at doWrite (_stream_writable.js:397:12)
at writeOrBuffer (_stream_writable.js:383:5)
at Socket.Writable.write (_stream_writable.js:290:11)
at Socket.write (net.js:707:40)
at Connection.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/connection.js:318:22)
at global.Promise (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:410:23)
at new Promise (<anonymous>)
at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:409:12)
(node:31819) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:31819) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
(node:31819) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added. Use emitter.setMaxListeners() to increase limit
我的代码是:
var pg = require("pg");
var fs = require('fs');
const pool = new pg.Pool({
user: 'smurf',
host: 'localhost',
database: 'mydb',
password: 'smurf',
port: 5432,
})
var filename = 'allCountries.txt';
var fs = require('fs'),
es = require('event-stream');
var lineNr = 0;
var max = 11784251; // Number of line, dirty, to get % of lines inserted
// Connect to Postgresql
pool.connect((err, client, done) => {
if (err) throw err
// Stream file line by line
var s = fs.createReadStream(filename)
.pipe(es.split())
.pipe(es.mapSync(function(e) {
// pause the readstream
s.pause();
lineNr += 1;
// Each line need to be properly formated
e = e.split("\t"); //TAB split
// The following fields need formating
e[0] = parseInt(e[0]);
e[4] = parseFloat(e[4]);
e[5] = parseFloat(e[5]);
e[14] = parseInt(e[14]);
e[15] = e[15] == '' ? 0 : e[15];
e[16] = parseInt(e[16]);
// Insert into db
pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES (, , , , , , , , , , , , , , , , , , );', e, function(err, result) {
if (err) {
console.log(err);
}
done(); // Release this connection to the pool
console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
s.resume(); // Go to next line
});
})
.on('error', function(err) {
console.log('Error while reading file.', err);
})
.on('end', function() {
console.log('Read entire file.')
})
);
}) // END pool.connect
我尝试使用 ReadFile、ReadFileSync、readline 扩展。移动或省略 done()
函数或只是移动它。
我通常使用 php 插入大量文件,所以我不知道我在这里做错了什么。
MaxListenersExceededWarning 错误对我来说毫无意义,因为似乎我关闭了我打开的所有内容。我在这里做错了什么?
谢谢!
如评论中所述 - 当您处理异步代码时,您需要使用 map
而不是 mapSync
操作并在插入项目后调用回调。
如果你使用这个,就不再需要调用 pause
和 resume
(这是由 event-stream
完成的),你只需要恢复你创建的最后一个流.然后是什么时候应该调用 done
的问题 - 即:在所有操作完成之后。
您的代码应如下所示:
var pg = require("pg");
var fs = require('fs');
const pool = new pg.Pool({
user: 'smurf',
host: 'localhost',
database: 'mydb',
password: 'smurf',
port: 5432,
})
var filename = 'allCountries.txt';
var fs = require('fs'),
es = require('event-stream');
var lineNr = 0;
var max = 11784251; // Number of line, dirty, to get % of lines inserted
// Connect to Postgresql
pool.connect((err, client, done) => {
if (err) throw err
// Stream file line by line
var s = fs.createReadStream(filename)
.pipe(es.split())
.pipe(es.map(function(e, cb) {
lineNr += 1;
// Each line need to be properly formated
e = e.split("\t"); //TAB split
// The following fields need formating
e[0] = parseInt(e[0]);
e[4] = parseFloat(e[4]);
e[5] = parseFloat(e[5]);
e[14] = parseInt(e[14]);
e[15] = e[15] == '' ? 0 : e[15];
e[16] = parseInt(e[16]);
// Insert into db
pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES (, , , , , , , , , , , , , , , , , , );', e, function(err, result) {
cb(err, result); // call the callback
console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
});
})
.resume()
.on('error', function(err) {
done();
console.log('Error while reading file.', err);
})
.on('end', function() {
done();
console.log('Read entire file.')
})
);
}) // END pool.connect
以上版本的代码不会使用您正在创建的池 - 它会一次对一个项目进行操作。
如果您使用的是相当新的节点 (8.4+),您可以使用我自己的框架 scramjet
这将允许使用 ES6 异步函数编写更简单的代码:
const {Pool} = require("pg");
const {StringStream} = require("scramjet");
const fs = require("fs");
const pool = new Pool(options);
const max = 11784251;
const INSERT_ENTRY = 'INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES (, , , , , , , , , , , , , , , , , , );';
StringStream
.from(fs.createReadStream(filename))
.lines()
.parse(line => {
// Each line need to be properly formated
const entry = line.split("\t"); //TAB split
// The following fields need formating
entry[0] = parseInt(entry[0]);
entry[4] = parseFloat(entry[4]);
entry[5] = parseFloat(entry[5]);
entry[14] = parseInt(entry[14]);
entry[15] = entry[15] == '' ? 0 : entry[15];
entry[16] = parseInt(entry[16]);
return entry;
})
.setOptions({maxParallel: 32})
.each(entry => {
const client = await pool.connect();
try {
await client.query(INSERT_ENTRY, entry)
} catch(err) {
console.log('Error while adding line...', err);
// some more logic could be here?
} finally {
client.release();
}
})
.each(() => !(lineNr++ % 1000) && console.log("Line added ", lineNr, (lineNr / max * 100) + "%"))
.run()
.then(
() => console.log('Read entire file.'),
e => console.log('Error while handling file.', err)
);
上面的代码将尝试使用池 运行 32 个并行插入(在每个条目上请求一个客户端 - pg pool.query
方法将重用并将客户端添加到它的设置限制。这不会这并不意味着它一定会快 32 倍 - 因为有一些外部限制因素,但你应该会看到速度急剧增加。
我已经下载了 Geonames 数据库转储,我正在尝试将所有内容放入 postgresql table,但无论我尝试什么,我都会 运行 出现多个错误。
我最后一次修改得到以下内容:
Error: Connection terminated by user
at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:402:36)
at Pool._remove (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:135:12)
at Timeout.setTimeout (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg-pool/index.js:38:12)
at ontimeout (timers.js:498:11)
at tryOnTimeout (timers.js:323:5)
at Timer.listOnTimeout (timers.js:290:5)
Line added 6052 0.05135667935111022%
(node:31819) UnhandledPromiseRejectionWarning: Error: This socket is closed
at Socket._writeGeneric (net.js:729:18)
at Socket._write (net.js:783:8)
at doWrite (_stream_writable.js:397:12)
at writeOrBuffer (_stream_writable.js:383:5)
at Socket.Writable.write (_stream_writable.js:290:11)
at Socket.write (net.js:707:40)
at Connection.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/connection.js:318:22)
at global.Promise (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:410:23)
at new Promise (<anonymous>)
at Client.end (/media/DarkHawk/srv/Databases/PremadeDB/Geonames/node_modules/pg/lib/client.js:409:12)
(node:31819) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:31819) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
(node:31819) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 end listeners added. Use emitter.setMaxListeners() to increase limit
我的代码是:
var pg = require("pg");
var fs = require('fs');
const pool = new pg.Pool({
user: 'smurf',
host: 'localhost',
database: 'mydb',
password: 'smurf',
port: 5432,
})
var filename = 'allCountries.txt';
var fs = require('fs'),
es = require('event-stream');
var lineNr = 0;
var max = 11784251; // Number of line, dirty, to get % of lines inserted
// Connect to Postgresql
pool.connect((err, client, done) => {
if (err) throw err
// Stream file line by line
var s = fs.createReadStream(filename)
.pipe(es.split())
.pipe(es.mapSync(function(e) {
// pause the readstream
s.pause();
lineNr += 1;
// Each line need to be properly formated
e = e.split("\t"); //TAB split
// The following fields need formating
e[0] = parseInt(e[0]);
e[4] = parseFloat(e[4]);
e[5] = parseFloat(e[5]);
e[14] = parseInt(e[14]);
e[15] = e[15] == '' ? 0 : e[15];
e[16] = parseInt(e[16]);
// Insert into db
pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES (, , , , , , , , , , , , , , , , , , );', e, function(err, result) {
if (err) {
console.log(err);
}
done(); // Release this connection to the pool
console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
s.resume(); // Go to next line
});
})
.on('error', function(err) {
console.log('Error while reading file.', err);
})
.on('end', function() {
console.log('Read entire file.')
})
);
}) // END pool.connect
我尝试使用 ReadFile、ReadFileSync、readline 扩展。移动或省略 done()
函数或只是移动它。
我通常使用 php 插入大量文件,所以我不知道我在这里做错了什么。
MaxListenersExceededWarning 错误对我来说毫无意义,因为似乎我关闭了我打开的所有内容。我在这里做错了什么?
谢谢!
如评论中所述 - 当您处理异步代码时,您需要使用 map
而不是 mapSync
操作并在插入项目后调用回调。
如果你使用这个,就不再需要调用 pause
和 resume
(这是由 event-stream
完成的),你只需要恢复你创建的最后一个流.然后是什么时候应该调用 done
的问题 - 即:在所有操作完成之后。
您的代码应如下所示:
var pg = require("pg");
var fs = require('fs');
const pool = new pg.Pool({
user: 'smurf',
host: 'localhost',
database: 'mydb',
password: 'smurf',
port: 5432,
})
var filename = 'allCountries.txt';
var fs = require('fs'),
es = require('event-stream');
var lineNr = 0;
var max = 11784251; // Number of line, dirty, to get % of lines inserted
// Connect to Postgresql
pool.connect((err, client, done) => {
if (err) throw err
// Stream file line by line
var s = fs.createReadStream(filename)
.pipe(es.split())
.pipe(es.map(function(e, cb) {
lineNr += 1;
// Each line need to be properly formated
e = e.split("\t"); //TAB split
// The following fields need formating
e[0] = parseInt(e[0]);
e[4] = parseFloat(e[4]);
e[5] = parseFloat(e[5]);
e[14] = parseInt(e[14]);
e[15] = e[15] == '' ? 0 : e[15];
e[16] = parseInt(e[16]);
// Insert into db
pool.query('INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES (, , , , , , , , , , , , , , , , , , );', e, function(err, result) {
cb(err, result); // call the callback
console.log("Line added ", lineNr, (lineNr / max * 100) + "%") // Monitor progress
});
})
.resume()
.on('error', function(err) {
done();
console.log('Error while reading file.', err);
})
.on('end', function() {
done();
console.log('Read entire file.')
})
);
}) // END pool.connect
以上版本的代码不会使用您正在创建的池 - 它会一次对一个项目进行操作。
如果您使用的是相当新的节点 (8.4+),您可以使用我自己的框架 scramjet
这将允许使用 ES6 异步函数编写更简单的代码:
const {Pool} = require("pg");
const {StringStream} = require("scramjet");
const fs = require("fs");
const pool = new Pool(options);
const max = 11784251;
const INSERT_ENTRY = 'INSERT INTO geonames.rawdata (geonameid, name, asciiname, alternatenames, latitude, longitude, fclass, fcode, country, cc2, admin1, admin2, admin3, admin4, population, elevation, gtopo30, timezone, moddate) VALUES (, , , , , , , , , , , , , , , , , , );';
StringStream
.from(fs.createReadStream(filename))
.lines()
.parse(line => {
// Each line need to be properly formated
const entry = line.split("\t"); //TAB split
// The following fields need formating
entry[0] = parseInt(entry[0]);
entry[4] = parseFloat(entry[4]);
entry[5] = parseFloat(entry[5]);
entry[14] = parseInt(entry[14]);
entry[15] = entry[15] == '' ? 0 : entry[15];
entry[16] = parseInt(entry[16]);
return entry;
})
.setOptions({maxParallel: 32})
.each(entry => {
const client = await pool.connect();
try {
await client.query(INSERT_ENTRY, entry)
} catch(err) {
console.log('Error while adding line...', err);
// some more logic could be here?
} finally {
client.release();
}
})
.each(() => !(lineNr++ % 1000) && console.log("Line added ", lineNr, (lineNr / max * 100) + "%"))
.run()
.then(
() => console.log('Read entire file.'),
e => console.log('Error while handling file.', err)
);
上面的代码将尝试使用池 运行 32 个并行插入(在每个条目上请求一个客户端 - pg pool.query
方法将重用并将客户端添加到它的设置限制。这不会这并不意味着它一定会快 32 倍 - 因为有一些外部限制因素,但你应该会看到速度急剧增加。