使用 mysql-节点传输数据时无法暂停池
Cannot pause pool when streaming data with mysql-node
我使用节点和 mysql 包将数据从节点流式传输到客户端。
这个想法是,
定义一个池,并基于池进行查询。
然后将流式传输行传递给数组。
如果该数组的长度达到一定长度,则暂停流,处理行,通过 websockets 将它们发送到客户端。
恢复直播。重复直到没有其他行剩下。
我正在按照 mysql npm 页面上的示例进行操作,但我得到了 pool.pause is not a function
这是代码
var pool = mysql.createPool({
connectionLimit : 100,
host : config.host,
user : config.user,
password : config.password,
database : config.database
});
//turn simple queries to promises
const query = (str, ar) => {
return new Promise(function(resolve, reject) {
pool.query(str, ar, function (error, results, fields) {
if (error) {
return reject(error);
}
resolve({results, fields});
});
})//promise
}
const userdetails = (ws, data) => {
//do a check, unrelated to streaming
query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
.then((data)=>{
if(data.results[0].countrows > 5000){
// if more than 5000, we stream
// the following is based on the mysql code found in their page
// it has no relation to the promise-based query above
var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category])
query.on('result', row => {
rowsToProcess.push(row);
if (rowsToProcess.length >= 100) {
pool.pause();
processRows();
}
});
query.on('end', () => {
processRows();
});
const processRows = (done) => {
//process some data
//send them back using websockets
ws.send(JSON.stringify({ data }));
pool.resume();
}
}
})
}
我不知道这是否与进行简单查询、承诺或使用池或其他任何事情有关。这给出了 TypeError: pool.pause is not a function
并且我无法修复它。请指教
谢谢
使用这个
var pool = mysql.createPool({
connectionLimit : 100,
host : config.host,
user : config.user,
password : config.password,
database : config.database
});
//turn simple queries to promises
const query = (str, ar) => {
return new Promise(function(resolve, reject) {
pool.query(str, ar, function (error, results, fields) {
if (error) {
return reject(error);
}
resolve({results, fields});
});
})//promise
}
const userdetails = (ws, data) => {
//do a check, unrelated to streaming
query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
.then((data)=>{
if(data.results[0].countrows > 5000){
// if more than 5000, we stream
// the following is based on the mysql code found in their page
// it has no relation to the promise-based query above
var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category]).stream();
query.on('result', row => {
rowsToProcess.push(row);
if (rowsToProcess.length >= 100) {
pool.pause();
processRows();
}
});
query.on('end', () => {
processRows();
});
const processRows = (done) => {
//process some data
//send them back using websockets
ws.send(JSON.stringify({ data }));
pool.resume();
}
}
})
}
您可以试试这个解决方案,
这个我用过很多次了:
const mysqlStreamQueryPromise = (queryString, params) => {
return new Promise((resolve, reject) => {
let streamData = connection.query(queryString,params).stream();
let data = [];
streamData.on('data', item => {
streamData.pause();
data.push(item);
streamData.resume();
});
streamData.on('end', end => {
return resolve(data);
});
streamData.on('error', error => {
return reject(error);
});
});
}
我使用节点和 mysql 包将数据从节点流式传输到客户端。
这个想法是,
定义一个池,并基于池进行查询。
然后将流式传输行传递给数组。
如果该数组的长度达到一定长度,则暂停流,处理行,通过 websockets 将它们发送到客户端。
恢复直播。重复直到没有其他行剩下。
我正在按照 mysql npm 页面上的示例进行操作,但我得到了 pool.pause is not a function
这是代码
var pool = mysql.createPool({
connectionLimit : 100,
host : config.host,
user : config.user,
password : config.password,
database : config.database
});
//turn simple queries to promises
const query = (str, ar) => {
return new Promise(function(resolve, reject) {
pool.query(str, ar, function (error, results, fields) {
if (error) {
return reject(error);
}
resolve({results, fields});
});
})//promise
}
const userdetails = (ws, data) => {
//do a check, unrelated to streaming
query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
.then((data)=>{
if(data.results[0].countrows > 5000){
// if more than 5000, we stream
// the following is based on the mysql code found in their page
// it has no relation to the promise-based query above
var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category])
query.on('result', row => {
rowsToProcess.push(row);
if (rowsToProcess.length >= 100) {
pool.pause();
processRows();
}
});
query.on('end', () => {
processRows();
});
const processRows = (done) => {
//process some data
//send them back using websockets
ws.send(JSON.stringify({ data }));
pool.resume();
}
}
})
}
我不知道这是否与进行简单查询、承诺或使用池或其他任何事情有关。这给出了 TypeError: pool.pause is not a function
并且我无法修复它。请指教
谢谢
使用这个
var pool = mysql.createPool({
connectionLimit : 100,
host : config.host,
user : config.user,
password : config.password,
database : config.database
});
//turn simple queries to promises
const query = (str, ar) => {
return new Promise(function(resolve, reject) {
pool.query(str, ar, function (error, results, fields) {
if (error) {
return reject(error);
}
resolve({results, fields});
});
})//promise
}
const userdetails = (ws, data) => {
//do a check, unrelated to streaming
query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
.then((data)=>{
if(data.results[0].countrows > 5000){
// if more than 5000, we stream
// the following is based on the mysql code found in their page
// it has no relation to the promise-based query above
var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category]).stream();
query.on('result', row => {
rowsToProcess.push(row);
if (rowsToProcess.length >= 100) {
pool.pause();
processRows();
}
});
query.on('end', () => {
processRows();
});
const processRows = (done) => {
//process some data
//send them back using websockets
ws.send(JSON.stringify({ data }));
pool.resume();
}
}
})
}
您可以试试这个解决方案, 这个我用过很多次了:
const mysqlStreamQueryPromise = (queryString, params) => {
return new Promise((resolve, reject) => {
let streamData = connection.query(queryString,params).stream();
let data = [];
streamData.on('data', item => {
streamData.pause();
data.push(item);
streamData.resume();
});
streamData.on('end', end => {
return resolve(data);
});
streamData.on('error', error => {
return reject(error);
});
});
}