使用 Knex SQL Query Builder 和 Node.js 中的 Bluebird 使用 ACID 事务退出循环
Exiting Loop with ACID Transactions with Knex SQL Query Builder & Bluebird in Node.js
我们正在使用 Knex SQL 查询生成器在 Node 中执行 ACID 事务,我们在使用 Knex 循环时是否遇到了一些奇怪的行为。下面的代码采用 table 的数组,然后有条件地执行插入或更新。第一个 table 是 'transactionHeader' 并首先处理。然后,在整个事务中处理 'transactionDetail' table 中的行。新键(rowids)累积在 'rowids' 数组中。
问题:主要问题是,如果 Knex 出现错误 return,似乎无法退出 processTransactionDetail() 中的循环。 throw 和 return 都不会退出循环或函数。这意味着如果处理 transactionDetail 时出错,它将在退出前继续处理剩余的行。
let rowids: any[] = [];
knex.transaction(function(trx) {
// Process transactionHeader
if (transactionHeader.rowid) {
// Update transactionHeader
trx('transaction')
.transacting(trx)
.update(transactionHeader)
.where('rowid', transactionHeader.rowid)
.then(function(transactionrowid) {
rowids.push({ table: 'transaction', rowid: transactionHeader.rowid });
// Update transactionDetail rows.
processTransactionDetail(transactionrowid, trx);
})
.catch(trx.rollback);
} else {
// Insert transactionHeader
trx('transaction')
.transacting(trx)
.insert(transactionHeader, 'rowid')
.then(function(transactionrowid) {
rowids.push({ table: 'transaction', rowid: transactionrowid });
// Insert transactionDetail rows.
processTransactionDetail(transactionrowid, trx);
})
.catch(trx.rollback);
}
}).then(function(inserts) {
console.log('success!', rowids)
callback(null, { success: true }, { data: rowids })
return;
}).catch(function(error) {
console.error('error', error);
callback(null, {
success: false,
message: error.message
}, { data: rowids })
return;
});
/*
* Process transactionDetail rows.
*/
function processTransactionDetail(transactionHeaderRowID: number, trx) {
var promise: any;
let table: TABLE;
let rowid: number;
for (let i = 1; i < tablesToProcess.length; i++) {
table = tablesToProcess[i];
rowid = table.data[0].rowid;
// Update
if (rowid) {
for (let row = 0; row < table.data.length; row++) {
promise = knex(table.name)
.transacting(trx)
.update(table.data[row])
.where('rowid', rowid)
.then(function(rowid) {
rowids.push({ table: table.name, rowid: rowid });
.catch(function(error) {
// --------------------------------
// **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
// --------------------------------
throw 'error';
return;
// --------------------------------
})
}
// Insert
} else {
for (let row = 0; row < table.data.length; row++) {
promise = knex(table.name)
.transacting(trx)
.insert(table.data[row])
.then(function(rowid) {
rowids.push({ table: table.name, rowid: rowid });
})
.catch(function(error) {
// --------------------------------
// **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
// --------------------------------
throw 'error';
return;
// --------------------------------
});
}
}
}
promise.then(function(x) {
promise.then(trx.commit);
});
}
更新: 这是一个合适的结构吗?不确定底部的错误处理程序是否真的需要。
knex.transaction(function(trx) {
// Update Row
function updateRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.update(row)
.where('rowid', rowid)
.then(function(rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// Insert Row
function insertRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.insert(row)
.then(function(rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// Process Tables
Promise.mapSeries(tablesToProcess, function(table) {
let rowid = table.data[0].rowid;
// choose the right function to apply to the rows
var fn = rowid ? updateRow : insertRow;
// fn needs table and rowid
fn = fn.bind(this, table, rowid);
// Process Rows
return Promise.mapSeries(table.data, fn)
.then(function(result) {
// result is an array with all the knex promises result
return result;
}).catch(function(err) {
console.log('an error happened');
//trx.rollback(); // QUESTION: IS THIS NEEDED?
throw err; // IS THERE A WAY TO
});
}).then(function(result) {
console.log('success', result);
trx.commit();
// callback(null, { success: true }, { data: rowids })
// return;
}).catch(function(error) {
console.log('error', error);
trx.rollback();
callback(null, { success: false, message: error.message }, { data: rowids })
});
}).then(function(inserts) {
console.log('success!', rowids)
callback(null, { success: true }, { data: rowids })
}).catch(function(error) {
console.log('error', error);
callback(null, { success: false, message: error.message }, { data: rowids })
});
您正在处理承诺,因此您必须使用一种方法来循环支持它们。例如有bluebird's mapSeries()
:
var Promise = require('bluebird');
function updateRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.update(table.data[row])
.where('rowid', rowid)
.then(function(rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
function insertRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.insert(table.data[row])
.then(function(rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// if there is an error, the iteration will stop immediately
Promise.mapSeries(tablesToProcess, function(table) {
rowid = table.data[0].rowid;
// choose the right function to apply to the rows
var fn = rowid ? updateRow : insertRow;
// fn need table and rowid
fn = fn.bind(this, table, rowid);
// call fn for each row
// if there is an error, the iteration will stop immediately
return Promise.mapSeries(table.data, fn)
.then(function(result) {
// result is an array with all the knex promises result
return result;
}).catch(function(err) {
console.log('an error happened');
trx.rollback();
throw err;
});
}).then(function(result) {
console.log('all is good');
// you can safely commit here
trx.commit();
}).catch(function(err) {
console.log('an error happened');
trx.rollback();
});
更新
关于您的问题:
knex.transaction(function (trx) {
// Update Row
function updateRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.update(row)
.where('rowid', rowid)
.then(function (rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// Insert Row
function insertRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.insert(row)
.then(function (rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// you need to return here so the 'then(function (inserts)' and the catch work
// Process Tables
return Promise.mapSeries(tablesToProcess, function (table) {
let rowid = table.data[0].rowid;
// choose the right function to apply to the rows
var fn = rowid ? updateRow : insertRow;
// fn needs table and rowid
fn = fn.bind(this, table, rowid);
// Process Rows
// if you don't do anything special in the then and the catch, you can remove them
return Promise.mapSeries(table.data, fn)
.then(function (result) {
// result is an array with all the knex promises result
return result;
}).catch(function (err) {
// this catch is not necessary,
// you can remove it you don't need to do something here
console.log('an error happened');
//trx.rollback(); // QUESTION: IS THIS NEEDED? << // no, my mistake, the rollback is done on the other catch
throw err; // IS THERE A WAY TO
});
}).then(function (result) {
console.log('success', result);
trx.commit();
return result;
// callback(null, { success: true }, { data: rowids })
// return;
}).catch(function (error) {
console.log('error', error);
trx.rollback();
throw error; // always rethrow error when you chain, if you don't, it's like the promise is resolved (ok)
// you already do this below
// callback(null, { success: false, message: error.message }, { data: rowids });
});
}).then(function (inserts) {
console.log('success!', rowids)
callback(null, { success: true }, { data: rowids })
}).catch(function (error) {
console.log('error', error);
callback(null, { success: false, message: error.message }, { data: rowids })
});
底部的2 then 和2 catch 可以合并。另外,为什么会有回调?除非无法避免,否则最好不要混合使用 Promise 和回调。
我们正在使用 Knex SQL 查询生成器在 Node 中执行 ACID 事务,我们在使用 Knex 循环时是否遇到了一些奇怪的行为。下面的代码采用 table 的数组,然后有条件地执行插入或更新。第一个 table 是 'transactionHeader' 并首先处理。然后,在整个事务中处理 'transactionDetail' table 中的行。新键(rowids)累积在 'rowids' 数组中。
问题:主要问题是,如果 Knex 出现错误 return,似乎无法退出 processTransactionDetail() 中的循环。 throw 和 return 都不会退出循环或函数。这意味着如果处理 transactionDetail 时出错,它将在退出前继续处理剩余的行。
let rowids: any[] = [];
knex.transaction(function(trx) {
// Process transactionHeader
if (transactionHeader.rowid) {
// Update transactionHeader
trx('transaction')
.transacting(trx)
.update(transactionHeader)
.where('rowid', transactionHeader.rowid)
.then(function(transactionrowid) {
rowids.push({ table: 'transaction', rowid: transactionHeader.rowid });
// Update transactionDetail rows.
processTransactionDetail(transactionrowid, trx);
})
.catch(trx.rollback);
} else {
// Insert transactionHeader
trx('transaction')
.transacting(trx)
.insert(transactionHeader, 'rowid')
.then(function(transactionrowid) {
rowids.push({ table: 'transaction', rowid: transactionrowid });
// Insert transactionDetail rows.
processTransactionDetail(transactionrowid, trx);
})
.catch(trx.rollback);
}
}).then(function(inserts) {
console.log('success!', rowids)
callback(null, { success: true }, { data: rowids })
return;
}).catch(function(error) {
console.error('error', error);
callback(null, {
success: false,
message: error.message
}, { data: rowids })
return;
});
/*
* Process transactionDetail rows.
*/
function processTransactionDetail(transactionHeaderRowID: number, trx) {
var promise: any;
let table: TABLE;
let rowid: number;
for (let i = 1; i < tablesToProcess.length; i++) {
table = tablesToProcess[i];
rowid = table.data[0].rowid;
// Update
if (rowid) {
for (let row = 0; row < table.data.length; row++) {
promise = knex(table.name)
.transacting(trx)
.update(table.data[row])
.where('rowid', rowid)
.then(function(rowid) {
rowids.push({ table: table.name, rowid: rowid });
.catch(function(error) {
// --------------------------------
// **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
// --------------------------------
throw 'error';
return;
// --------------------------------
})
}
// Insert
} else {
for (let row = 0; row < table.data.length; row++) {
promise = knex(table.name)
.transacting(trx)
.insert(table.data[row])
.then(function(rowid) {
rowids.push({ table: table.name, rowid: rowid });
})
.catch(function(error) {
// --------------------------------
// **PROBLEM**: THERE IS NO WAY TO BREAK FROM THE LOOP
// --------------------------------
throw 'error';
return;
// --------------------------------
});
}
}
}
promise.then(function(x) {
promise.then(trx.commit);
});
}
更新: 这是一个合适的结构吗?不确定底部的错误处理程序是否真的需要。
knex.transaction(function(trx) {
// Update Row
function updateRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.update(row)
.where('rowid', rowid)
.then(function(rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// Insert Row
function insertRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.insert(row)
.then(function(rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// Process Tables
Promise.mapSeries(tablesToProcess, function(table) {
let rowid = table.data[0].rowid;
// choose the right function to apply to the rows
var fn = rowid ? updateRow : insertRow;
// fn needs table and rowid
fn = fn.bind(this, table, rowid);
// Process Rows
return Promise.mapSeries(table.data, fn)
.then(function(result) {
// result is an array with all the knex promises result
return result;
}).catch(function(err) {
console.log('an error happened');
//trx.rollback(); // QUESTION: IS THIS NEEDED?
throw err; // IS THERE A WAY TO
});
}).then(function(result) {
console.log('success', result);
trx.commit();
// callback(null, { success: true }, { data: rowids })
// return;
}).catch(function(error) {
console.log('error', error);
trx.rollback();
callback(null, { success: false, message: error.message }, { data: rowids })
});
}).then(function(inserts) {
console.log('success!', rowids)
callback(null, { success: true }, { data: rowids })
}).catch(function(error) {
console.log('error', error);
callback(null, { success: false, message: error.message }, { data: rowids })
});
您正在处理承诺,因此您必须使用一种方法来循环支持它们。例如有bluebird's mapSeries()
:
var Promise = require('bluebird');
function updateRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.update(table.data[row])
.where('rowid', rowid)
.then(function(rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
function insertRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.insert(table.data[row])
.then(function(rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// if there is an error, the iteration will stop immediately
Promise.mapSeries(tablesToProcess, function(table) {
rowid = table.data[0].rowid;
// choose the right function to apply to the rows
var fn = rowid ? updateRow : insertRow;
// fn need table and rowid
fn = fn.bind(this, table, rowid);
// call fn for each row
// if there is an error, the iteration will stop immediately
return Promise.mapSeries(table.data, fn)
.then(function(result) {
// result is an array with all the knex promises result
return result;
}).catch(function(err) {
console.log('an error happened');
trx.rollback();
throw err;
});
}).then(function(result) {
console.log('all is good');
// you can safely commit here
trx.commit();
}).catch(function(err) {
console.log('an error happened');
trx.rollback();
});
更新
关于您的问题:
knex.transaction(function (trx) {
// Update Row
function updateRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.update(row)
.where('rowid', rowid)
.then(function (rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// Insert Row
function insertRow(table, rowid, row) {
return knex(table.name)
.transacting(trx)
.insert(row)
.then(function (rowid) {
rowids.push({
table: table.name,
rowid: rowid
});
});
}
// you need to return here so the 'then(function (inserts)' and the catch work
// Process Tables
return Promise.mapSeries(tablesToProcess, function (table) {
let rowid = table.data[0].rowid;
// choose the right function to apply to the rows
var fn = rowid ? updateRow : insertRow;
// fn needs table and rowid
fn = fn.bind(this, table, rowid);
// Process Rows
// if you don't do anything special in the then and the catch, you can remove them
return Promise.mapSeries(table.data, fn)
.then(function (result) {
// result is an array with all the knex promises result
return result;
}).catch(function (err) {
// this catch is not necessary,
// you can remove it you don't need to do something here
console.log('an error happened');
//trx.rollback(); // QUESTION: IS THIS NEEDED? << // no, my mistake, the rollback is done on the other catch
throw err; // IS THERE A WAY TO
});
}).then(function (result) {
console.log('success', result);
trx.commit();
return result;
// callback(null, { success: true }, { data: rowids })
// return;
}).catch(function (error) {
console.log('error', error);
trx.rollback();
throw error; // always rethrow error when you chain, if you don't, it's like the promise is resolved (ok)
// you already do this below
// callback(null, { success: false, message: error.message }, { data: rowids });
});
}).then(function (inserts) {
console.log('success!', rowids)
callback(null, { success: true }, { data: rowids })
}).catch(function (error) {
console.log('error', error);
callback(null, { success: false, message: error.message }, { data: rowids })
});
底部的2 then 和2 catch 可以合并。另外,为什么会有回调?除非无法避免,否则最好不要混合使用 Promise 和回调。