带回调的数据库事务

DB transaction with callback

我正在尝试弄清楚如何正确设置此代码块,以便提交函数将等待所有行都被插入。目前我正在读取一个 csv 文件,需要在 table 中每列插入一个新行。为此,我还需要在父 table 中添加一行。在我调用 commit 之前,我需要完成所有这些。我还没有掌握回调,所以请温柔点。

db.beginTransaction(function (err) {
    if (err) 
    {
        //could not begin a transaction for some reason.
        logger.error("beginTransaction error: " +err);
    }
    //need to wrap this *************
    db.query("INSERT INTO TABLE VALUES ('" + unique_id + "','real_time','" + msg + "',1,1,LOCALTIMESTAMP)", function(err){
        if(err)
        {
            logger.error("error insert into parent table: "+err);
        }
    });

    for(var i = 0; i < headers.length; i++)
    {
        //replaces single quote (') with two single quotes to escape it ('')
        values[i] = values[i].replace("'","''");
        db.query("INSERT INTO TABLE VALUES ('" + unique_id + "','" + headers[i] + "',0,'" + values[i] + "')", function(err){
            if(err)
            {
                logger.error("error insert into child table: "+err);
            }
        });
    }
    //To here ************
    db.commitTransaction(function (err) {
        if (err) 
        {
            //error during commit
            logger.error("Commit error: "+err);
        }
    }); //end of commitTransaction
    callback();
});//End of beginTransaction

正如 tadman 所说,不要手动转义值并改用参数化查询非常重要。请务必先解决此问题。

不幸的是,它看起来 node-odbc 不支持承诺。您也许可以让它与 Bluebird.promisify 之类的东西一起工作。目前您想要的是跟踪完成了多少成功的插入,然后在它们全部成功完成时提交事务。

let successfulInsertions = 0;
let insertionAttempts = 0;
for(var i = 0; i < headers.length; i++) {
    db.query("INSERT INTO TABLE VALUES (?, ?, ?, ?)", params, err => {
        if (err) {
            logger.error("error insert into child table: "+err);
        }
        else {
          successfulInsertions++;
        }
        insertionAttempts++;

        if (insertionAttempts === headers.length) {
            if (successfulInsertions === insertionAttempts) {
                db.commitTransaction();
            }
            else {
                db.rollbackTransaction();
            }
            callback();
        }
    });
}

有一些库可以帮助解决这个问题,但它们需要稍微重写您的代码。如果您可以在 node-odbc 库上使用 Bluebird 的 promisifyAll,我会使用 async/await 重写它(这仍然是异步的并且功能相同):

await db.beginTransactionAsync();
try {
  await db.queryAsync("INSERT INTO TABLE VALUES (?, ?, ?, ?)", params);

  await Promise.all(headers.map((header, i) =>
    db.queryAsync("INSERT INTO TABLE VALUES (?, ?, ?, ?)", [unique_id, header, 0, values[i])
  );
  await db.commitTransactionAsync();
} catch (err) {
  logger.error(err);
  db.rollbackTransaction();
}

请注意,如果 beginTransaction 由于某种原因抛出错误,此代码将抛出错误。

有三种基本方法可以解决此同步问题,我将在此处使用新样式 arrow functions 进行演示。传统的 Node 方法是使用回调:

a((err, resultA) => {
  // Fires when A is done or errored out

  if (err) {
    // Log, panic, etc.
    return;
  }

  b((err, resultB) => {
    // Fires when A and B are done or A is done and B errored out

    if (err) {
      // Log, panic, etc.
      return;
    }

    c((err, resultC) => {
     // Fires when A, B and C are done or A and B are done and C errored out

      if (err) {
        // Log, panic, etc.
        return;
      }
    });
  });
});

这就是人们所说的 "callback hell",因为随着依赖项的复杂性增加,嵌套和错误传播代码变得越来越荒谬。我发现这种风格对于任何重要的应用程序来说都是不可持续的。

下一个风格是 Promise-driven:

a().then(resultA => {
  // Fires when A is done
  return b();
}).then(resultB => {
  // Fires when B is done
  return c();
}).then(resultC => {
  // Fires when C is done
}).catch(err => {
  // Fires if any of the previous calls produce an error
});

这往往很多 "flatter" 并且更容易理解,但对于本应简单的内容来说,它仍然有很多繁重的语法。较新的 async/await 样式通过在 JavaScript 语法中添加对承诺的支持而建立在承诺之上:

try {
  let resultA = await a();
  let resultB = await a();
  let resultC = await a();
} catch(err) {
  // Fires when any error occurs
}

这适用于任何标记为 async 的函数,例如:

async function runQueries() {
  // async code
}

这可以让你的生活更轻松。您还可以对错误使用传统的 try/catch 表示法,它们会相应地传播。

我不太理解你的代码,你为什么不return或者如果有错误就停止你的代码?

例如,你的代码应该是这样的

db.beginTransaction(function (err) {
    if (err) 
        return logger.error("beginTransaction error: " +err), db.rollback(/*...*/)
    //....
}

其次,您应该使用 async/await 语法更改整个代码。

async function foo () {
    await new Promise((next, err)=> {
        db.beginTransaction(e => e ? err(e) : next())
    })

    await new Promise((next, err)=> {
        db.query(`INSERT INTO TABLE VALUES ('${unique_id}','real_time','${msg}',1,1,LOCALTIMESTAMP)`, e => e ? err(e) : next())
    })

    for (var i = 0; i < headers.length; i++) {
        await new Promise((next, err)=> {
            db.query(`INSERT INTO TABLE VALUES ('${unique_id}','${headers[i]}',0,'${values[i]}')`, e => e ? err(e) : next())
        })
    }

    await new Promise((next, err)=> {
        db.commitTransaction(e => e ? err(e) : next())
    })
}

foo()
.then(()=> callback())
.catch(e=> logger.error(`Error: ${e}`))