Sequelize 等到循环完成回调
Sequelize wait until loop finished with callback
来自 php 背景,我正在努力了解这个回调问题。
基本上我想获取一些行,然后我想遍历这些行并根据其他模型(不同的数据库)检查它们。我希望回电等到它们全部循环并检查完毕。
在 sequelize 循环遍历所有结果之前调用回调。
基本上我希望函数是'blocking'。我需要更改什么?
toexport.getlasttransactions = function(lower,upper,callback){
var deferred = Q.defer();
var transactionsToUpdate = [];
///////////////////////////
// set import conditions //
///////////////////////////
var lowerbound = (lower) ? lower.format() : moment.utc().subtract(10, 'minutes').format();
var upperbound = (upper) ? upper.format() : moment.utc().format();
///////////////////////////////
// get IDs From Failed syncs //
///////////////////////////////
FailedSync.find({ limit: 100 })
.then(function(res){
var FailedIDs = [];
_.each(res, function(value,index){
FailedIDs.push(value.transaction_id);
});
// build condition
var queryCondition = { where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 };
if(FailedIDs.length > 0){
queryCondition = {
where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
Sequelize.or(
{ id: FailedIDs }
))
}
}
//////////////////////////////
// get Phoenix Transactions //
//////////////////////////////
PhoenixTransaction
.findAll(queryCondition)
.then(function(poenixTrx){
_.each(poenixTrx, function(value, index){
Transaction.findOne({ where: { id: value.id }})
.then(function(result){
if(!result || result.length === 0){
transactionsToUpdate.push(value);
console.log('!result || result.length === 0')
}
else if(result && result.length === 1){
if(result.hash != value.hash){
transactionsToUpdate.push(value);
console.log('result.hash != poenixTrx[i].hash')
}
}
})
.catch(function(err) {
console.log(err)
})
})
deferred.resolve(transactionsToUpdate);
})
.catch(function(err){
throw new Error("Something went wrong getting PhoenixTransaction")
})
})
deferred.promise.nodeify(callback);
return deferred.promise;
}
理想情况下,您会希望使用像 Bluebird 的 reduce 和一系列承诺的东西,但我会提供一个 async.series 实现,因为它更容易理解。
安装异步
npm install async
在您的文件中需要它
var async = require('async')
然后这样实现:
//////////////////////////////
// get Phoenix Transactions //
//////////////////////////////
PhoenixTransaction
.findAll(queryCondition)
.then(function(poenixTrx){
var queryArray = poenixTrx.map(function(value){
return function(callback){
Transaction.findOne({ where: { id: value.id }})
.then(function(result){
if(!result || result.length === 0){
transactionsToUpdate.push(value);
console.log('!result || result.length === 0')
}
else if(result && result.length === 1){
if(result.hash != value.hash){
transactionsToUpdate.push(value);
console.log('result.hash != poenixTrx[i].hash')
}
}
// trigger callback with any result you want
callback(null, result)
})
.catch(function(err) {
console.log(err)
// trigger error callback
callback(err)
})
}
})
// async.series will loop through he queryArray, and execute each function one by one until they are all completed or an error is thrown.
// for additional information see https://github.com/caolan/async#seriestasks-callback
async.series(queryArray, function(err, callback){
// after all your queries are done, execution will be here
// resolve the promise with the transactionToUpdate array
deferred.resolve(transactionsToUpdate);
})
})
.catch(function(err){
throw new Error("Something went wrong getting PhoenixTransaction")
})
老实说,整件事有点乱。特别是 promise/callback 的混淆可能会在某些时候给你带来麻烦。无论如何,您在 transactionsToUpdate 上使用 deferred.resolve,它只是一个数组,因此它会立即调用回调。
如果您保留该脚本,而不是使用 _.each 类似异步 (https://github.com/caolan/async) 的东西 运行 您的并行事务并将其用作回调。
它可能看起来像这样:
toexport.getlasttransactions = function(lower,upper,callback){
var transactionsToUpdate = [];
///////////////////////////
// set import conditions //
///////////////////////////
var lowerbound = (lower) ? lower.format() : moment.utc().subtract(10, 'minutes').format();
var upperbound = (upper) ? upper.format() : moment.utc().format();
///////////////////////////////
// get IDs From Failed syncs //
///////////////////////////////
FailedSync.find({ limit: 100 })
.then(function(res){
var FailedIDs = [];
_.each(res, function(value,index){
FailedIDs.push(value.transaction_id);
});
// build condition
var queryCondition = { where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 };
if(FailedIDs.length > 0){
queryCondition = {
where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
Sequelize.or(
{ id: FailedIDs }
))
}
}
//////////////////////////////
// get Phoenix Transactions //
//////////////////////////////
PhoenixTransaction
.findAll(queryCondition)
.then(function(poenixTrx){
async.each(poenixTrx, function(value, next){
Transaction.findOne({ where: { id: value.id }})
.then(function(result){
if(!result || result.length === 0){
transactionsToUpdate.push(value);
console.log('!result || result.length === 0')
}
else if(result && result.length === 1){
if(result.hash != value.hash){
transactionsToUpdate.push(value);
console.log('result.hash != poenixTrx[i].hash')
}
}
next();
})
.catch(function(err) {
console.log(err)
})
}, function(err) {
//Return the array transactionsToUpdate in your callback for further use
return callback(err, transactionsToUpdate);
});
})
.catch(function(err){
throw new Error("Something went wrong getting PhoenixTransaction")
})
})
}
这就是回调的方式。
但是您需要决定要使用什么:回调或承诺。不要同时使用两者(如:如果你的方法需要回调,它不应该 return 一个承诺,或者如果它 return 是一个承诺,它不应该期望回调)。
另外,如果你使用回调你不想抛出错误,你只需调用回调并在回调中给出错误 - 使用你的方法的任何人都可以检查回调中的错误并处理它。
希望这对你有点意义,我知道如果你来自 php 之类的东西,整个回调和承诺的事情会有点奇怪,它需要一些时间来适应 :)
您的代码中有很多新承诺用户的模式:
- 你在不需要的时候使用了延迟。
- 您没有使用 promise 聚合方法
- 你不是在适当的地方等待,而是在嵌套。
承诺代表 随着时间推移的价值。您可以稍后使用 promises 并通过 then
访问它们的结果,而不仅仅是立即 - Sequelize 的 promises 基于 bluebird 并提供丰富的 API 为您聚合。
这是清理代码的注释版本 - 注意它不是嵌套:
toexport.getlasttransactions = function(lower,upper){ // no need for callback
var lowerbound = (lower || moment.utc().subtract(10, 'minutes')).format();
var upperbound = (upper || moment.utc()).format();
// use `map` over a `each` with a push.
var failedIds = FailedSync.find({ limit: 100 }).map(function(value){
return value.transaction_id;
});
// build condition.
var queryCondition = {
where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3
};
var query = failedIds.then(function(ids){ // use promise as proxy
if(ids.length === 0) return queryCondition;
return { // You can return a value or a promise from `then`
where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
Sequelize.or({ id: ids});
};
});
var pheonixTransactions = query.then(function(condition){
return PhoenixTransaction.findAll(queryCondition); // filter based on result
});
return pheonixTransactions.map(function(value){ // again, map over each
return Transaction.findOne({ where: { id: value.id }}); // get the relevant one
}).filter(function(result){ // filter over if chain and push
return (!result || result.length === 0) ||
((result && result.length === 1) && result.hash != value.hash);
});
};
感谢您解释差异。我认为使用 promises 是前进的方向,因为它使代码看起来更好并避免了这种情况 "callback hell".
例如:
PhoenixSyncTransactions.getlasttransactions(lastTimeSynced,null)
.then(function(res){
return PersistTransaction.prepareTransactions(res).then(function(preparedTrx){
return preparedTrx;
})
}).then(function(preparedTrx){
return PersistTransaction.persistToDB(preparedTrx).then(function(Processes){
return Processes;
})
})
.then(function(Processes){
return PersistTransaction.checkIfMultiProcess(Processes).then(function(result){
return result;
})
})
.then(function(result){
console.log('All jobs done');
})
整个代码更易于阅读。
来自 php 背景,我正在努力了解这个回调问题。
基本上我想获取一些行,然后我想遍历这些行并根据其他模型(不同的数据库)检查它们。我希望回电等到它们全部循环并检查完毕。
在 sequelize 循环遍历所有结果之前调用回调。
基本上我希望函数是'blocking'。我需要更改什么?
toexport.getlasttransactions = function(lower,upper,callback){
var deferred = Q.defer();
var transactionsToUpdate = [];
///////////////////////////
// set import conditions //
///////////////////////////
var lowerbound = (lower) ? lower.format() : moment.utc().subtract(10, 'minutes').format();
var upperbound = (upper) ? upper.format() : moment.utc().format();
///////////////////////////////
// get IDs From Failed syncs //
///////////////////////////////
FailedSync.find({ limit: 100 })
.then(function(res){
var FailedIDs = [];
_.each(res, function(value,index){
FailedIDs.push(value.transaction_id);
});
// build condition
var queryCondition = { where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 };
if(FailedIDs.length > 0){
queryCondition = {
where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
Sequelize.or(
{ id: FailedIDs }
))
}
}
//////////////////////////////
// get Phoenix Transactions //
//////////////////////////////
PhoenixTransaction
.findAll(queryCondition)
.then(function(poenixTrx){
_.each(poenixTrx, function(value, index){
Transaction.findOne({ where: { id: value.id }})
.then(function(result){
if(!result || result.length === 0){
transactionsToUpdate.push(value);
console.log('!result || result.length === 0')
}
else if(result && result.length === 1){
if(result.hash != value.hash){
transactionsToUpdate.push(value);
console.log('result.hash != poenixTrx[i].hash')
}
}
})
.catch(function(err) {
console.log(err)
})
})
deferred.resolve(transactionsToUpdate);
})
.catch(function(err){
throw new Error("Something went wrong getting PhoenixTransaction")
})
})
deferred.promise.nodeify(callback);
return deferred.promise;
}
理想情况下,您会希望使用像 Bluebird 的 reduce 和一系列承诺的东西,但我会提供一个 async.series 实现,因为它更容易理解。
安装异步
npm install async
在您的文件中需要它
var async = require('async')
然后这样实现:
//////////////////////////////
// get Phoenix Transactions //
//////////////////////////////
PhoenixTransaction
.findAll(queryCondition)
.then(function(poenixTrx){
var queryArray = poenixTrx.map(function(value){
return function(callback){
Transaction.findOne({ where: { id: value.id }})
.then(function(result){
if(!result || result.length === 0){
transactionsToUpdate.push(value);
console.log('!result || result.length === 0')
}
else if(result && result.length === 1){
if(result.hash != value.hash){
transactionsToUpdate.push(value);
console.log('result.hash != poenixTrx[i].hash')
}
}
// trigger callback with any result you want
callback(null, result)
})
.catch(function(err) {
console.log(err)
// trigger error callback
callback(err)
})
}
})
// async.series will loop through he queryArray, and execute each function one by one until they are all completed or an error is thrown.
// for additional information see https://github.com/caolan/async#seriestasks-callback
async.series(queryArray, function(err, callback){
// after all your queries are done, execution will be here
// resolve the promise with the transactionToUpdate array
deferred.resolve(transactionsToUpdate);
})
})
.catch(function(err){
throw new Error("Something went wrong getting PhoenixTransaction")
})
老实说,整件事有点乱。特别是 promise/callback 的混淆可能会在某些时候给你带来麻烦。无论如何,您在 transactionsToUpdate 上使用 deferred.resolve,它只是一个数组,因此它会立即调用回调。
如果您保留该脚本,而不是使用 _.each 类似异步 (https://github.com/caolan/async) 的东西 运行 您的并行事务并将其用作回调。
它可能看起来像这样:
toexport.getlasttransactions = function(lower,upper,callback){
var transactionsToUpdate = [];
///////////////////////////
// set import conditions //
///////////////////////////
var lowerbound = (lower) ? lower.format() : moment.utc().subtract(10, 'minutes').format();
var upperbound = (upper) ? upper.format() : moment.utc().format();
///////////////////////////////
// get IDs From Failed syncs //
///////////////////////////////
FailedSync.find({ limit: 100 })
.then(function(res){
var FailedIDs = [];
_.each(res, function(value,index){
FailedIDs.push(value.transaction_id);
});
// build condition
var queryCondition = { where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 };
if(FailedIDs.length > 0){
queryCondition = {
where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
Sequelize.or(
{ id: FailedIDs }
))
}
}
//////////////////////////////
// get Phoenix Transactions //
//////////////////////////////
PhoenixTransaction
.findAll(queryCondition)
.then(function(poenixTrx){
async.each(poenixTrx, function(value, next){
Transaction.findOne({ where: { id: value.id }})
.then(function(result){
if(!result || result.length === 0){
transactionsToUpdate.push(value);
console.log('!result || result.length === 0')
}
else if(result && result.length === 1){
if(result.hash != value.hash){
transactionsToUpdate.push(value);
console.log('result.hash != poenixTrx[i].hash')
}
}
next();
})
.catch(function(err) {
console.log(err)
})
}, function(err) {
//Return the array transactionsToUpdate in your callback for further use
return callback(err, transactionsToUpdate);
});
})
.catch(function(err){
throw new Error("Something went wrong getting PhoenixTransaction")
})
})
}
这就是回调的方式。 但是您需要决定要使用什么:回调或承诺。不要同时使用两者(如:如果你的方法需要回调,它不应该 return 一个承诺,或者如果它 return 是一个承诺,它不应该期望回调)。
另外,如果你使用回调你不想抛出错误,你只需调用回调并在回调中给出错误 - 使用你的方法的任何人都可以检查回调中的错误并处理它。
希望这对你有点意义,我知道如果你来自 php 之类的东西,整个回调和承诺的事情会有点奇怪,它需要一些时间来适应 :)
您的代码中有很多新承诺用户的模式:
- 你在不需要的时候使用了延迟。
- 您没有使用 promise 聚合方法
- 你不是在适当的地方等待,而是在嵌套。
承诺代表 随着时间推移的价值。您可以稍后使用 promises 并通过 then
访问它们的结果,而不仅仅是立即 - Sequelize 的 promises 基于 bluebird 并提供丰富的 API 为您聚合。
这是清理代码的注释版本 - 注意它不是嵌套:
toexport.getlasttransactions = function(lower,upper){ // no need for callback
var lowerbound = (lower || moment.utc().subtract(10, 'minutes')).format();
var upperbound = (upper || moment.utc()).format();
// use `map` over a `each` with a push.
var failedIds = FailedSync.find({ limit: 100 }).map(function(value){
return value.transaction_id;
});
// build condition.
var queryCondition = {
where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3
};
var query = failedIds.then(function(ids){ // use promise as proxy
if(ids.length === 0) return queryCondition;
return { // You can return a value or a promise from `then`
where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
Sequelize.or({ id: ids});
};
});
var pheonixTransactions = query.then(function(condition){
return PhoenixTransaction.findAll(queryCondition); // filter based on result
});
return pheonixTransactions.map(function(value){ // again, map over each
return Transaction.findOne({ where: { id: value.id }}); // get the relevant one
}).filter(function(result){ // filter over if chain and push
return (!result || result.length === 0) ||
((result && result.length === 1) && result.hash != value.hash);
});
};
感谢您解释差异。我认为使用 promises 是前进的方向,因为它使代码看起来更好并避免了这种情况 "callback hell".
例如:
PhoenixSyncTransactions.getlasttransactions(lastTimeSynced,null)
.then(function(res){
return PersistTransaction.prepareTransactions(res).then(function(preparedTrx){
return preparedTrx;
})
}).then(function(preparedTrx){
return PersistTransaction.persistToDB(preparedTrx).then(function(Processes){
return Processes;
})
})
.then(function(Processes){
return PersistTransaction.checkIfMultiProcess(Processes).then(function(result){
return result;
})
})
.then(function(result){
console.log('All jobs done');
})
整个代码更易于阅读。