Sequelize 等到循环完成回调

Sequelize wait until loop finished with callback

来自 php 背景,我正在努力了解这个回调问题。

基本上我想获取一些行,然后我想遍历这些行并根据其他模型(不同的数据库)检查它们。我希望回电等到它们全部循环并检查完毕。

在 sequelize 循环遍历所有结果之前调用回调。

基本上我希望函数是'blocking'。我需要更改什么?

toexport.getlasttransactions = function(lower,upper,callback){
    var deferred = Q.defer();
    var transactionsToUpdate = [];
    ///////////////////////////
    // set import conditions //
    ///////////////////////////
    var lowerbound = (lower) ? lower.format() : moment.utc().subtract(10, 'minutes').format();
    var upperbound = (upper) ? upper.format() : moment.utc().format();

    ///////////////////////////////
    // get IDs From Failed syncs //
    ///////////////////////////////
    FailedSync.find({ limit: 100 })
    .then(function(res){
        var FailedIDs = [];
        _.each(res, function(value,index){
            FailedIDs.push(value.transaction_id);
        });

        // build condition
        var queryCondition = { where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 };
        if(FailedIDs.length > 0){
            queryCondition = {
                where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
                Sequelize.or(
                  { id: FailedIDs }
                ))
            }
        }
        //////////////////////////////
        // get Phoenix Transactions //
        //////////////////////////////
        PhoenixTransaction
        .findAll(queryCondition)
        .then(function(poenixTrx){

            _.each(poenixTrx, function(value, index){

                Transaction.findOne({ where: { id: value.id }})
                .then(function(result){

                    if(!result || result.length === 0){
                        transactionsToUpdate.push(value);
                        console.log('!result || result.length === 0')
                    }
                    else if(result && result.length === 1){
                        if(result.hash != value.hash){
                            transactionsToUpdate.push(value);
                            console.log('result.hash != poenixTrx[i].hash')
                        }
                    }




                })
                .catch(function(err) {
                  console.log(err)
                })


            })
            deferred.resolve(transactionsToUpdate);


        })
        .catch(function(err){
          throw new Error("Something went wrong getting PhoenixTransaction") 
        })

    })

    deferred.promise.nodeify(callback);
    return deferred.promise;    

}

理想情况下,您会希望使用像 Bluebird 的 reduce 和一系列承诺的东西,但我会提供一个 async.series 实现,因为它更容易理解。

安装异步

npm install async

在您的文件中需要它

var async = require('async')

然后这样实现:

        //////////////////////////////
        // get Phoenix Transactions //
        //////////////////////////////
        PhoenixTransaction
        .findAll(queryCondition)
        .then(function(poenixTrx){

            var queryArray = poenixTrx.map(function(value){
                return function(callback){
                    Transaction.findOne({ where: { id: value.id }})
                    .then(function(result){

                        if(!result || result.length === 0){
                            transactionsToUpdate.push(value);
                            console.log('!result || result.length === 0')
                        }
                        else if(result && result.length === 1){
                            if(result.hash != value.hash){
                                transactionsToUpdate.push(value);
                                console.log('result.hash != poenixTrx[i].hash')
                            }
                        }

                        // trigger callback with any result you want
                        callback(null, result)
                    })
                    .catch(function(err) {
                      console.log(err)
                      // trigger  error callback
                      callback(err)
                    })
                }

            })

            // async.series will loop through he queryArray, and execute each function one by one until they are all completed or an error is thrown.
            // for additional information see https://github.com/caolan/async#seriestasks-callback
            async.series(queryArray, function(err, callback){
                // after all your queries are done, execution will be here
                // resolve the promise with the transactionToUpdate array
                deferred.resolve(transactionsToUpdate);
            })


        })
        .catch(function(err){
          throw new Error("Something went wrong getting PhoenixTransaction") 
        })

老实说,整件事有点乱。特别是 promise/callback 的混淆可能会在某些时候给你带来麻烦。无论如何,您在 transactionsToUpdate 上使用 deferred.resolve,它只是一个数组,因此它会立即调用回调。

如果您保留该脚本,而不是使用 _.each 类似异步 (https://github.com/caolan/async) 的东西 运行 您的并行事务并将其用作回调。

它可能看起来像这样:

toexport.getlasttransactions = function(lower,upper,callback){
    var transactionsToUpdate = [];
    ///////////////////////////
    // set import conditions //
    ///////////////////////////
    var lowerbound = (lower) ? lower.format() : moment.utc().subtract(10, 'minutes').format();
    var upperbound = (upper) ? upper.format() : moment.utc().format();

    ///////////////////////////////
    // get IDs From Failed syncs //
    ///////////////////////////////
    FailedSync.find({ limit: 100 })
    .then(function(res){
        var FailedIDs = [];
        _.each(res, function(value,index){
            FailedIDs.push(value.transaction_id);
        });

        // build condition
        var queryCondition = { where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 };
        if(FailedIDs.length > 0){
            queryCondition = {
                where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
                Sequelize.or(
                  { id: FailedIDs }
                ))
            }
        }
        //////////////////////////////
        // get Phoenix Transactions //
        //////////////////////////////
        PhoenixTransaction
        .findAll(queryCondition)
        .then(function(poenixTrx){

            async.each(poenixTrx, function(value, next){

                Transaction.findOne({ where: { id: value.id }})
                .then(function(result){

                    if(!result || result.length === 0){
                        transactionsToUpdate.push(value);
                        console.log('!result || result.length === 0')
                    }
                    else if(result && result.length === 1){
                        if(result.hash != value.hash){
                            transactionsToUpdate.push(value);
                            console.log('result.hash != poenixTrx[i].hash')
                        }
                    }

                    next();
                })
                .catch(function(err) {
                  console.log(err)
                })


            }, function(err) {
              //Return the array transactionsToUpdate in your callback for further use
              return callback(err, transactionsToUpdate);
            });
        })
        .catch(function(err){
          throw new Error("Something went wrong getting PhoenixTransaction") 
        })

    })

}

这就是回调的方式。 但是您需要决定要使用什么:回调或承诺。不要同时使用两者(如:如果你的方法需要回调,它不应该 return 一个承诺,或者如果它 return 是一个承诺,它不应该期望回调)。

另外,如果你使用回调你不想抛出错误,你只需调用回调并在回调中给出错误 - 使用你的方法的任何人都可以检查回调中的错误并处理它。

希望这对你有点意义,我知道如果你来自 php 之类的东西,整个回调和承诺的事情会有点奇怪,它需要一些时间来适应 :)

您的代码中有很多新承诺用户的模式:

  • 你在不需要的时候使用了延迟。
  • 您没有使用 promise 聚合方法
  • 你不是在适当的地方等待,而是在嵌套。

承诺代表 随着时间推移的价值。您可以稍后使用 promises 并通过 then 访问它们的结果,而不仅仅是立即 - Sequelize 的 promises 基于 bluebird 并提供丰富的 API 为您聚合。 这是清理代码的注释版本 - 注意它不是嵌套:

toexport.getlasttransactions = function(lower,upper){ // no need for callback
    var lowerbound = (lower || moment.utc().subtract(10, 'minutes')).format();
    var upperbound = (upper || moment.utc()).format();
    // use `map` over a `each` with a push.
    var failedIds = FailedSync.find({ limit: 100 }).map(function(value){ 
        return value.transaction_id;
    });
    // build condition.
    var queryCondition = {
        where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 
    };
    var query = failedIds.then(function(ids){ // use promise as proxy
        if(ids.length === 0) return queryCondition;
        return { // You can return a value or a promise from `then`
            where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
                   Sequelize.or({ id: ids});
        };
    });
    var pheonixTransactions = query.then(function(condition){
        return PhoenixTransaction.findAll(queryCondition); // filter based on result
    });
    return pheonixTransactions.map(function(value){ // again, map over each
        return Transaction.findOne({ where: { id: value.id }}); // get the relevant one
    }).filter(function(result){ // filter over if chain and push
        return (!result || result.length === 0) || 
               ((result && result.length === 1) && result.hash != value.hash);
    });
};

感谢您解释差异。我认为使用 promises 是前进的方向,因为它使代码看起来更好并避免了这种情况 "callback hell".

例如:

PhoenixSyncTransactions.getlasttransactions(lastTimeSynced,null)
.then(function(res){

    return PersistTransaction.prepareTransactions(res).then(function(preparedTrx){
      return preparedTrx;
    })
}).then(function(preparedTrx){
    return PersistTransaction.persistToDB(preparedTrx).then(function(Processes){
      return Processes;
    })
})
.then(function(Processes){
    return PersistTransaction.checkIfMultiProcess(Processes).then(function(result){
      return result;
    })

})
.then(function(result){
  console.log('All jobs done');
})

整个代码更易于阅读。