为什么这段特定的代码在 运行 的四分之一处退出了远程数据库,而不是本地数据库?

Why is this particular piece of code exiting a quarter of the way through a run against a remote db, but not a local db?

我写了这个脚本来提取一些 s3 对象并将它们导入数据库,当我 运行 这个针对本地数据库时,它导入了我知道应该存在的所有 3265 条记录。当我 运行 它针对我的 AWS Postgres 实例时,它挂了一小段路并干净地退出并且只导入大约 50 条左右的记录。我认为这是某种超时,但还没有找到任何具有这种效果的东西。它还会忽略自定义 process.exit。我也梳理了 Postgres 实例日志,但没有发现任何问题。我有点不知所措,我想我可能只是错过了节点的微妙之处或者我编写这段代码的方式。

const { NODE_ENV } = process.env
import config from '../config'

config()

import AWS from 'aws-sdk';
import db from './db/sequelize/models/db_connection'
process.on('uncaughtException', function (exception, p) {
  console.log(p)
  console.log(exception);
});

class FailedImport extends Error {
  constructor(message) {
    Error.captureStackTrace(this, this.constructor);
    this.name = this.constructor.name;
    this.message = message;
  }
}

var s3 = new AWS.S3()
async function listObjects() {
  return await s3.listObjects({Bucket: process.env.S3_BUCKET_NAME}).promise()
}

function importData(objectList) {
  return objectList.Contents.map( async (obj) => {
    try {
      let data = await s3.getObject({ Bucket: process.env.S3_BUCKET_NAME, Key: obj.Key}).promise()
      let body = data.Body
      let dataLines = body.toString().split('\n')
      return Promise.all(dataLines.map( async (line) => {
        try {
          let jsonifiedLine = JSON.parse(line)
          return await db.Site.upsert({ url: jsonifiedLine['api_url'], quantcast_rank: 0})
        } catch(e) {
          console.error(e)
        }
      }))
    }
    catch(err) {
      console.log(err)
    }
  })
}

export function runImport() {
  listObjects().then((objects) => {
    return Promise.all(importData(objects))
      .then(() => console.log('Finished import.'))
      .catch((err) => console.log(err))
  }).catch((err) => {
    console.log(err)
    throw new FailedImport(err)
  })
}

runImport()

事实证明,我混合了两种截然不同的 promise 架构风格,最重要的是我使用 map 的方式不是异步安全的。我最终切换到 for of for 我的循环并尝试 catch async waits 并修复了代码。下面要比较的新代码:

class FailedImport extends Error {
  constructor(message) {
    Error.captureStackTrace(this, this.constructor);
    this.name = this.constructor.name;
    this.message = message;
  }
}

var s3 = new AWS.S3()

async function listObjects() {
  return await s3.listObjects({Bucket: process.env.S3_BUCKET_NAME, Prefix: "datasets/cleaned.data/"}).promise()
}

async function importData(objectList) {
  try {
    for (let obj of objectList.Contents) {
      let data = await s3.getObject({ Bucket: process.env.S3_BUCKET_NAME, Key: obj.Key}).promise()
      console.log(obj.Key)
      let body = data.Body
      let dataLines = body.toString().split('\n')
      dataLines.pop()
      let jsonLines = dataLines.map((row) => JSON.parse(row.trim()))
      for (let line of jsonLines) {
        try {
          await db.Site.upsert({ url: line['api_url'], quantcast_rank: 0})
        }
        catch(err) {
          console.log(err)
        }
      }
    }
  }
  catch(err) {
    console.log(err)
  }
}

export function runImport() {
  listObjects().then((objects) => {
    importData(objects)
      .then(() => console.log('Finished.'))
      .catch((err) => console.error(err))
  }).catch((err) => {
    console.error(err)
  })
}