节点 JS 在使用事务时将对象数组插入 mysql 数据库

Node JS Inserting array of objects to mysql database when using transactions

我正在使用 node-mysql 将记录添加到数据库,但是当要插入的记录是一个对象数组并且我需要将操作作为一个 事务时,我面临着挑战。我通过创建一个测试项目来简化我的问题,以更好地解释我的问题。

假设我必须使用表 usersorders,要插入的数据如下所示

var user = {
   name: "Dennis Wanyonyi",
   email: "example@email.com"
};

var orders = [{
   order_date: new Date(),
   price: 14.99
}, {
   order_date: new Date(),
   price: 39.99
}];

我想先将 user 插入数据库,然后使用 insertId 为该用户添加每个 orders。我正在使用事务,因为如果出现错误,我想回滚整个过程。这是我尝试使用 node-mysql transactions.

插入所有记录的方法
connection.beginTransaction(function(err) {
  if (err) { throw err; }
  connection.query('INSERT INTO users SET ?', user, function(err, result) {
    if (err) {
      return connection.rollback(function() {
        throw err;
      });
    }


    for (var i = 0; i < orders.length; i++) {

      orders[i].user_id = result.insertId;

        connection.query('INSERT INTO orders SET ?', orders[i], function(err, result2) {
          if (err) {
            return connection.rollback(function() {
              throw err;
            });
          }  
          connection.commit(function(err) {
            if (err) {
              return connection.rollback(function() {
                throw err;
              });
            }
            console.log('success!');
          });
        });
       }
      });
     });

但是我在遍历 orders 数组时遇到问题,而不必在 for 循环

中多次调用 connection.commit

您需要使用异步库来进行此类操作。

connection.beginTransaction(function(err) {   
if (err) { throw err; }
 async.waterfall([
        function(cb){
            createUser(userDetail, function(err, data){
              if(err) return cb(err);
              cb(null, data.userId);
           });
        },
      function(userid,cb){
        createOrderForUser(userid,orders, function() {
            if(err) return cb(err);
            cb(null);
        });
      }
    ], function(err){
      if (err) 
        retrun connection.rollback(function() {
              throw err;
        });

       connection.commit(function(err) {
        if (err) {
          return connection.rollback(function() {
            throw err;
          });
        }
        console.log('success!');
      });     
    }); 
});
var createUser = function(userdetail, cb){ 
//-- Creation of Orders
};
var createOrderForUser = function (userId, orders, cb) {
  async.each(orders, function(order, callback){
 //-- create orders for users
},function(err){
   // doing err checking.
    cb();
  });
};

Node.js 中的某些任务是异步的(如 I/O、数据库等),并且有很多 LIBS 可以帮助处理它。

但是如果你不想使用任何库,为了在 JS 中迭代数组并将其用于异步功能,最好将其实现为递归函数。

connection.beginTransaction(function(err) {
if (err) {
    throw err;
}
connection.query('INSERT INTO users SET ?', user, function(err, result) {
    if (err) {
        return connection.rollback(function() {
            throw err;
        });
    }
    // console.log(result.insertId) --> do any thing if need with inserted ID 

    var insertOrder = function(nextId) {
        console.log(nextId);
        if ((orders.length - 1) < nextId) {
            connection.commit(function(err) {
                if (err) {
                    return connection.rollback(function() {
                        throw err;
                    })
                }
                console.log(" ok");
            });

        } else {
            console.log(orders[nextId]);
            connection.query('INSERT INTO orders SET ?', orders[nextId], function(err, result2) {
                if (err) {
                    return connection.rollback(function() {
                        throw err;
                    });
                }

                insertOrder(nextId + 1);
            });
        }
    }
    insertOrder(0);

});
});

如您所见,我将您的 for 循环重写为内部的递归函数。

看看能不能写一个Stored Procedure来封装查询,在SP里有START TRANSACTION ... COMMIT

棘手的部分是需要将事物列表传递给 SP,因为没有 "array" 机制。实现这一点的一种方法是使用一个 commalist(或使用其他分隔符),然后使用一个循环来分离列表。

我建议先在 for 循环中为订单 table 的多行插入查询构建一个简单的字符串,然后在 for 循环外执行它。使用 for 循环仅构造字符串。因此,您可以随时回滚查询或出错。通过多次插入查询字符串,我的意思如下:

INSERT INTO your_table_name
    (column1,column2,column3)
VALUES
    (1,2,3),
    (4,5,6),
    (7,8,9);

我会使用 async.each 进行迭代并并行触发所有查询。如果某些查询失败,将调用 asyncCallback 并出错,程序将停止处理查询。这将表明我们应该停止执行查询和回滚。如果没有错误,我们可以调用提交。

我将代码进一步解耦并将其拆分为函数:

function rollback(connection, err) {
  connection.rollback(function () {
    throw err;
  });
}

function commit(connection) {
  connection.commit(function (err) {
    if (err) {
      rollback(connection, err);
    }

    console.log('success!');
  });
}

function insertUser(user, callback) {
  connection.query('INSERT INTO users SET ?', user, function (err, result) {
    return callback(err, result);
  });
}

function insertOrders(orders, userId, callback) {
  async.each(orders, function (order, asyncCallback) {
    order.user_id = userId;

    connection.query('INSERT INTO orders SET ?', order, function (err, data) {
      return asyncCallback(err, data);
    });
  }, function (err) {
    if (err) {
      // One of the iterations above produced an error.
      // All processing will stop and we have to rollback.
      return callback(err);
    }

    // Return without errors
    return callback();
  });
}

connection.beginTransaction(function (err) {
  if (err) {
    throw err;
  }

  insertUser(user, function (err, result) {
    if (err) {
      rollback(connection, err);
    }

    insertOrders(orders, result.insertId, function (err, data) {
      if (err) {
        rollback(connection, err);
      } else {
        commit(connection);
      }
    });
  });
});

您可以为此使用 Bluebird 的 Promise.all 功能。

var promiseArray = dataArray.map(function(data){
    return new BluebirdPromise(function(resolve, reject){
        connection.insertData(function(error, response){
            if(error) reject(error);
            else resolve(response);
        }); //This is obviously a mock
    });
});

之后:

BluebirdPromise.all(promiseArray).then(function(result){
    //result will be the array of "response"s from resolve(response);
    database.commit();
});

这样,您可以异步处理所有插入,然后只使用一次 database.commit()。

        currentLogs = [
 { socket_id: 'Server', message: 'Socketio online', data: 'Port  3333', logged: '2014-05-14 14:41:11' },
 { socket_id: 'Server', message: 'Waiting for Pi to connect...', data: 'Port: 8082', logged: '2014-05-14 14:41:11' }
];

console.warn(currentLogs.map(logs=>[ logs.socket_id , logs.message , logs.data , logs.logged ]));