使用 mysql-节点传输数据时无法暂停池

Cannot pause pool when streaming data with mysql-node

我使用节点和 mysql 包将数据从节点流式传输到客户端。

这个想法是,

定义一个池,并基于池进行查询。

然后将流式传输行传递给数组。

如果该数组的长度达到一定长度,则暂停流,处理行,通过 websockets 将它们发送到客户端。

恢复直播。重复直到没有其他行剩下。

我正在按照 mysql npm 页面上的示例进行操作,但我得到了 pool.pause is not a function

这是代码

var pool  = mysql.createPool({
  connectionLimit : 100,
  host            : config.host,
  user            : config.user,
  password        : config.password,
  database        : config.database
});

//turn simple queries to promises
const query = (str, ar) => {
  return new Promise(function(resolve, reject) {
    pool.query(str, ar, function (error, results, fields) { 
      if (error) {
          return reject(error);
      }
      resolve({results, fields});
    });
  })//promise
}


const userdetails = (ws, data) => {

//do a check, unrelated to streaming
   query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
  .then((data)=>{  
     if(data.results[0].countrows > 5000){
        // if more than 5000, we stream
        // the following is based on the mysql code found in their page
        // it has no relation to the promise-based query above
        var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category])

        query.on('result', row => {  
          rowsToProcess.push(row);
          if (rowsToProcess.length >= 100) { 
            pool.pause();
            processRows();
          }
        });

        query.on('end', () => {
          processRows();
        });

       const processRows = (done) => {
         //process some data
         //send them back using websockets  
         ws.send(JSON.stringify({ data })); 
         pool.resume();  
       }
     }
  })
}

我不知道这是否与进行简单查询、承诺或使用池或其他任何事情有关。这给出了 TypeError: pool.pause is not a function 并且我无法修复它。请指教

谢谢

使用这个

    var pool  = mysql.createPool({
        connectionLimit : 100,
        host            : config.host,
        user            : config.user,
        password        : config.password,
        database        : config.database
      });

      //turn simple queries to promises
      const query = (str, ar) => {
        return new Promise(function(resolve, reject) {
          pool.query(str, ar, function (error, results, fields) { 
            if (error) {
                return reject(error);
            }
            resolve({results, fields});
          });
        })//promise
      }


      const userdetails = (ws, data) => {

      //do a check, unrelated to streaming
         query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
        .then((data)=>{  
           if(data.results[0].countrows > 5000){
              // if more than 5000, we stream
              // the following is based on the mysql code found in their page
              // it has no relation to the promise-based query above
              var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category]).stream();

              query.on('result', row => {  
                rowsToProcess.push(row);
                if (rowsToProcess.length >= 100) { 
                  pool.pause();
                  processRows();
                }
              });

              query.on('end', () => {
                processRows();
              });

             const processRows = (done) => {
               //process some data
               //send them back using websockets  
               ws.send(JSON.stringify({ data })); 
               pool.resume();  
             }
           }
        })
      }

您可以试试这个解决方案, 这个我用过很多次了:

    const mysqlStreamQueryPromise = (queryString, params) => {
        return new Promise((resolve, reject) => {
            let streamData = connection.query(queryString,params).stream();
            let data = [];
            streamData.on('data', item => {
                streamData.pause();
                data.push(item);
                streamData.resume();
            });
            streamData.on('end', end => {
                return resolve(data);
            });
            streamData.on('error', error => {
                return reject(error);
            });
        });
    }