使用 Knex SQL Query Builder 和 Node.js 中的 Bluebird 使用 ACID 事务退出循环

Exiting Loop with ACID Transactions with Knex SQL Query Builder & Bluebird in Node.js

我们正在使用 Knex SQL 查询生成器在 Node 中执行 ACID 事务,我们在使用 Knex 循环时是否遇到了一些奇怪的行为。下面的代码采用 table 的数组,然后有条件地执行插入或更新。第一个 table 是 'transactionHeader' 并首先处理。然后,在整个事务中处理 'transactionDetail' table 中的行。新键(rowids)累积在 'rowids' 数组中。

问题:主要问题是,如果 Knex 出现错误 return,似乎无法退出 processTransactionDetail() 中的循环。 throw 和 return 都不会退出循环或函数。这意味着如果处理 transactionDetail 时出错,它将在退出前继续处理剩余的行。

    let rowids: any[] = [];

    knex.transaction(function(trx) {

        // Process transactionHeader
        if (transactionHeader.rowid) {

            // Update transactionHeader
            trx('transaction')
                .transacting(trx)
                .update(transactionHeader)
                .where('rowid', transactionHeader.rowid)
                .then(function(transactionrowid) {
                    rowids.push({ table: 'transaction', rowid: transactionHeader.rowid });
                    // Update transactionDetail rows.
                    processTransactionDetail(transactionrowid, trx);
                })
                .catch(trx.rollback);

        } else {

            // Insert transactionHeader
            trx('transaction')
                .transacting(trx)
                .insert(transactionHeader, 'rowid')
                .then(function(transactionrowid) {
                    rowids.push({ table: 'transaction', rowid: transactionrowid });
                    // Insert transactionDetail rows.
                    processTransactionDetail(transactionrowid, trx);
                })
                .catch(trx.rollback);
        }

    }).then(function(inserts) {
        console.log('success!', rowids)
        callback(null, { success: true }, { data: rowids })
        return;
    }).catch(function(error) {
        console.error('error', error);
        callback(null, {
            success: false,
            message: error.message
        }, { data: rowids })
        return;
    });

    /*
    * Process transactionDetail rows.
    */
    function processTransactionDetail(transactionHeaderRowID: number, trx) {

        var promise: any;
        let table: TABLE;
        let rowid: number;

        for (let i = 1; i < tablesToProcess.length; i++) {

            table = tablesToProcess[i];
            rowid = table.data[0].rowid;

            // Update
            if (rowid) {

                for (let row = 0; row < table.data.length; row++) {

                    promise = knex(table.name)
                        .transacting(trx)
                        .update(table.data[row])
                        .where('rowid', rowid)
                        .then(function(rowid) {                                
                            rowids.push({ table: table.name, rowid: rowid });
                        .catch(function(error) {                                
                             // --------------------------------
                             // **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
                             // --------------------------------
                             throw 'error';
                             return;
                             // --------------------------------
                        })
                 } 

            // Insert
            } else {

                for (let row = 0; row < table.data.length; row++) {

                    promise = knex(table.name)
                        .transacting(trx)
                        .insert(table.data[row])
                        .then(function(rowid) {
                            rowids.push({ table: table.name, rowid: rowid });
                        })
                        .catch(function(error) {
                             // --------------------------------
                             // **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
                             // --------------------------------
                             throw 'error';
                             return;
                             // --------------------------------
                        });
                }
            }
        }

        promise.then(function(x) {
            promise.then(trx.commit);
        });
    }

更新: 这是一个合适的结构吗?不确定底部的错误处理程序是否真的需要。

    knex.transaction(function(trx) {

        // Update Row
        function updateRow(table, rowid, row) {
            return knex(table.name)
                .transacting(trx)
                .update(row)
                .where('rowid', rowid)
                .then(function(rowid) {

                    rowids.push({
                        table: table.name,
                        rowid: rowid
                    });
                });
        }

        // Insert Row
        function insertRow(table, rowid, row) {
            return knex(table.name)
                .transacting(trx)
                .insert(row)
                .then(function(rowid) {

                    rowids.push({
                        table: table.name,
                        rowid: rowid
                    });
                });
        }

        // Process Tables
        Promise.mapSeries(tablesToProcess, function(table) {

            let rowid = table.data[0].rowid;

            // choose the right function to apply to the rows
            var fn = rowid ? updateRow : insertRow;

            // fn needs table and rowid
            fn = fn.bind(this, table, rowid);

            // Process Rows
            return Promise.mapSeries(table.data, fn)
                .then(function(result) {

                    // result is an array with all the knex promises result
                    return result;

                }).catch(function(err) {
                    console.log('an error happened');
                    //trx.rollback();  // QUESTION: IS THIS NEEDED?
                    throw err;     // IS THERE A WAY TO
                });

        }).then(function(result) {
            console.log('success', result);
            trx.commit();
            // callback(null, { success: true }, { data: rowids })
            // return;
        }).catch(function(error) {
            console.log('error', error);
            trx.rollback();
            callback(null, { success: false, message: error.message }, { data: rowids })
        });

    }).then(function(inserts) {
        console.log('success!', rowids)
        callback(null, { success: true }, { data: rowids })
    }).catch(function(error) {
        console.log('error', error);
        callback(null, { success: false, message: error.message }, { data: rowids })
    });

您正在处理承诺,因此您必须使用一种方法来循环支持它们。例如有bluebird's mapSeries()

var Promise = require('bluebird');

function updateRow(table, rowid, row) {
  return knex(table.name)
    .transacting(trx)
    .update(table.data[row])
    .where('rowid', rowid)
    .then(function(rowid) {
      rowids.push({
        table: table.name,
        rowid: rowid
      });
    });
}

function insertRow(table, rowid, row) {
  return knex(table.name)
    .transacting(trx)
    .insert(table.data[row])
    .then(function(rowid) {
      rowids.push({
        table: table.name,
        rowid: rowid
      });
    });
}

// if there is an error, the iteration will stop immediately
Promise.mapSeries(tablesToProcess, function(table) {
  rowid = table.data[0].rowid;

  // choose the right function to apply to the rows
  var fn = rowid ? updateRow : insertRow;
  // fn need table and rowid
  fn = fn.bind(this, table, rowid);

  // call fn for each row
  // if there is an error, the iteration will stop immediately
  return Promise.mapSeries(table.data, fn)
    .then(function(result) {
      // result is an array with all the knex promises result
      return result;
    }).catch(function(err) {
      console.log('an error happened');
      trx.rollback();
      throw err;
    });
}).then(function(result) {
  console.log('all is good');
  // you can safely commit here
  trx.commit();
}).catch(function(err) {
  console.log('an error happened');
  trx.rollback();
});

更新

关于您的问题:

knex.transaction(function (trx) {
    // Update Row
    function updateRow(table, rowid, row) {
        return knex(table.name)
            .transacting(trx)
            .update(row)
            .where('rowid', rowid)
            .then(function (rowid) {
                rowids.push({
                    table: table.name,
                    rowid: rowid
                });
            });
    }

    // Insert Row
    function insertRow(table, rowid, row) {
        return knex(table.name)
            .transacting(trx)
            .insert(row)
            .then(function (rowid) {
                rowids.push({
                    table: table.name,
                    rowid: rowid
                });
            });
    }

    // you need to return here so the 'then(function (inserts)' and the catch work
    // Process Tables
    return Promise.mapSeries(tablesToProcess, function (table) {
        let rowid = table.data[0].rowid;

        // choose the right function to apply to the rows
        var fn = rowid ? updateRow : insertRow;

        // fn needs table and rowid
        fn = fn.bind(this, table, rowid);

        // Process Rows
        // if you don't do anything special in the then and the catch, you can remove them
        return Promise.mapSeries(table.data, fn)
            .then(function (result) {
                // result is an array with all the knex promises result
                return result;
            }).catch(function (err) {
                // this catch is not necessary,
                // you can remove it you don't need to do something here
                console.log('an error happened');
                //trx.rollback();  // QUESTION: IS THIS NEEDED? << // no, my mistake, the rollback is done on the other catch
                throw err;     // IS THERE A WAY TO
            });
    }).then(function (result) {
        console.log('success', result);
        trx.commit();
        return result;
        // callback(null, { success: true }, { data: rowids })
        // return;
    }).catch(function (error) {
        console.log('error', error);
        trx.rollback();
        throw error; // always rethrow error when you chain, if you don't, it's like the promise is resolved (ok) 
        // you already do this below
        // callback(null, { success: false, message: error.message }, { data: rowids });
    });
}).then(function (inserts) {
    console.log('success!', rowids)
    callback(null, { success: true }, { data: rowids })
}).catch(function (error) {
    console.log('error', error);
    callback(null, { success: false, message: error.message }, { data: rowids })
});

底部的2 then 和2 catch 可以合并。另外,为什么会有回调?除非无法避免,否则最好不要混合使用 Promise 和回调。