Async/Await ForEach 循环中的 Node-Postgres 查询

Async/Await Node-Postgres Queries Within ForEach Loops

编辑:我正在使用节点 v8.0.0

我刚开始学习如何使用 node-postgres 访问 SQL 数据库,但我在访问多个数据库以收集可用格式的数据时遇到了一些麻烦,尤其是在执行多个数据库时forEach 循环中的查询。尝试几次后,我正在尝试 async/await,但出现以下错误:

await client.connect()
  ^^^^^^
SyntaxError: Unexpected identifier

当我尝试使用池或按顺序调用 .query 时,我会得到类似

的结果
1
[]
could not connect to postgres Error: Connection terminated

这是我的代码的简化版本:

const { Client } = require('pg');
const moment = require('moment');
const _ = require('lodash');
const turf = require('@turf/turf');

const connString = // connection string
var collected = []
const CID = 300
const snaptimes = // array of times
var counter=0;
const client = new Client(connString);

function createArray(i,j) {
     // return array of i arrays of length j
}

await client.connect()

snaptimes.forEach(function(snaptime){
  var info = {}; // an object of objects
  // get information at given snaptime from database 1
  const query1 = // parametrized query selecting two columns from database 1
  const result1 = await client.query(query1, [CID,snaptime]);
  var x = result1.rows;
  for (var i = 0; i < x.length; i++) {
     // store data from database 1 into info
     // each row is an object with two fields
  }

  // line up subjects on the hole
  const query2 = // parametrized query grabbing JSON string from database 2
  const result2 = await client.query(query2, [CID,snaptime]);
  const raw = result2.rows[0].JSON_col;
  const line = createArray(19,0); // an array of 19 empty arrays
  for (var i = 0; i < raw.length; i++) {
     // parse JSON object and record data into line 
  }

  // begin to collect data
  var n = 0;
  var g = 0;
  // walk down the line
  for (var i = 18; i > 0; i--) {
    // if no subjects are found at spot i, do nothing, except maybe update g
    if ((line[i] === undefined || line[i].length == 0) && g == 0){
      g = i;
    } else if (line[i] !== undefined && line[i].length != 0) {
      // collect data for each subject if subjects are found
      line[i].forEach(function(subject){
        const query 3 = // parametrized query grabbing data for each subject 
        const result3 = await client.query(query3,[CID,subject,snaptime]);
        x = result3.rows;
        const y = moment(x[0].end_time).diff(moment(snaptime),'minutes');
        var yhat = 0;
        // the summation over info depends on g
        if (g===0){
          for (var j = i; j <= 18; j++){
            yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
          }
        } else {
          for (var j = i; j <= 18; j++){
            if (i<j && j<g+1) {
              yhat = moment.duration(info[j].field2).add(yhat,'m').asMinutes();
            } else {
              yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
            }
          }
        }
        collected.push([y,yhat,n,i]);
      });
    }
    n+=line[i].length;
    g=0;
  }
  // really rough work-around I once used for printing results after a forEach of queries
  counter++;
  if (counter===snaptimes.length){
    console.log(counter);
    console.log(collected);
    client.end(); 
  }
});

问题是由于您的 forEach 回调不是 async:

snaptimes.forEach(function(snaptime){

应该是:

snaptimes.forEach(async function (snaptime) {

await 完全可以识别。

请记住,async 函数 returns 立即和它 returns 最终由 async 函数的 return 语句解决的承诺(或因 async 函数内引发的未捕获异常而被拒绝)。

但还要确保您的 Node 版本支持 async/await:

  • 从 Node 7.6 开始,它可以在没有 --harmony 标志的情况下使用。
  • 在 7.6 之前的节点 7.x 中,您必须使用 --harmony 标志。
  • 在 7.0 之前的 Node 中不可用。

参见:http://node.green/#ES2017-features-async-functions

另请注意,您只能在使用 async 关键字声明的函数内部使用 await 。如果你想在脚本或模块的顶层使用它,那么你需要将它包装在一个立即调用的函数表达式中:

// cannot use await here
(async () => {
  // can use await here
})();
// cannot use await here

示例:

const f = () => new Promise(r => setTimeout(() => r('x'), 500));

let x = await f();
console.log(x);

打印:

$ node t1.js 
/home/rsp/node/test/prom-async/t1.js:3
let x = await f();
              ^
SyntaxError: Unexpected identifier

但是这个:

const f = () => new Promise(r => setTimeout(() => r('x'), 500));

(async () => {
  let x = await f();
  console.log(x);
})();

打印:

$ node t2.js 
x

延迟 0.5 秒后,符合预期。

在不支持 async/await 的 Node 版本上,第一个(不正确的)示例将打印:

$ ~/opt/node-v6.7.0/bin/node t1.js 
/home/rsp/node/test/prom-async/t1.js:3
let x = await f();
              ^
SyntaxError: Unexpected identifier

第二个(正确的)示例将打印不同的错误:

$ ~/opt/node-v6.7.0/bin/node t2.js 
/home/rsp/node/test/prom-async/t2.js:3
(async () => {
       ^
SyntaxError: Unexpected token (

知道这一点很有用,因为不支持 async/await 的 Node 版本不会给你一个像 "async/await not supported" 或类似的有意义的错误,不幸的是。

确保您应该在外部使用 async 块,例如:

async function() {
  return await Promise.resolve('')
}

并且node 7.6.0之后默认支持。在 7.6.0 之前,你应该使用 --harmony 选项来工作。

node -v先检查一下你的版本。

首先,你对async-await还不够了解。别担心,其实很简单;但是你需要阅读文档才能使用这些东西。

更重要的是,您的代码的问题是您只能在 async 函数中使用 await;您在 any 函数之外执行此操作。

首先,这是最接近您编写的代码的解决方案:

const { Client } = require('pg');
const moment = require('moment');
const _ = require('lodash');
const turf = require('@turf/turf');

const connString = // connection string
var collected = []
const CID = 300
const snaptimes = // array of times
var counter=0;
const client = new Client(connString);

function createArray(i,j) {
    // return array of i arrays of length j
}

async function processSnaptime (snaptime) {
    var info = {}; // an object of objects
    // get information at given snaptime from database 1
    const query1 = // parametrized query selecting two columns from database 1
    const result1 = await client.query(query1, [CID,snaptime]);
    var x = result1.rows;
    for (var i = 0; i < x.length; i++) {
      // store data from database 1 into info
      // each row is an object with two fields
    }

    // line up subjects on the hole
    const query2 = // parametrized query grabbing JSON string from database 2
    const result2 = await client.query(query2, [CID,snaptime]);
    const raw = result2.rows[0].JSON_col;
    const line = createArray(19,0); // an array of 19 empty arrays
    for (var i = 0; i < raw.length; i++) {
      // parse JSON object and record data into line
    }

    // begin to collect data
    var n = 0;
    var g = 0;
    // walk down the line
    for (var i = 18; i > 0; i--) {
      // if no subjects are found at spot i, do nothing, except maybe update g
      if ((line[i] === undefined || line[i].length == 0) && g == 0){
        g = i;
      } else if (line[i] !== undefined && line[i].length != 0) {
        // collect data for each subject if subjects are found
        line[i].forEach(function(subject){
          const query 3 = // parametrized query grabbing data for each subject
          const result3 = await client.query(query3,[CID,subject,snaptime]);
          x = result3.rows;
          const y = moment(x[0].end_time).diff(moment(snaptime),'minutes');
          var yhat = 0;
          // the summation over info depends on g
          if (g===0){
            for (var j = i; j <= 18; j++){
              yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
            }
          } else {
            for (var j = i; j <= 18; j++){
              if (i<j && j<g+1) {
                yhat = moment.duration(info[j].field2).add(yhat,'m').asMinutes();
              } else {
                yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
              }
            }
          }
          collected.push([y,yhat,n,i]);
        });
      }
      n+=line[i].length;
      g=0;
    }
    // really rough work-around I once used for printing results after a forEach of queries
    counter++;
    if (counter===snaptimes.length){
      console.log(counter);
      console.log(collected);
    }
}

async function run () {
  for (let snaptime of snaptimes) {
    await processSnaptime(snaptime);
  }
}

/* to run all of them concurrently:
function run () {
  let procs = [];
  for (let snaptime of snaptimes) {
    procs.push(processSnaptime(snaptime));
  }
  return Promise.all(procs);
}
*/

client.connect().then(run).then(() => client.end());

client.connect returns 一个承诺,我使用 then 在它解决后调用 run。当那个部分结束后,client.end()就可以安全调用了。

run 是一个 async 函数,因此它可以使用 await 使代码更具可读性。 processSnaptime.

也是如此

当然我不能运行你的代码,所以我只能希望我没有犯任何错误。