如何对 sails-mongo 进行非常大的查询?

How to do a very large query on sails-mongo?

我正在使用 sails 0.11.2。使用最新的 sails-mongo 适配器。 我有一个非常大的数据库(千兆字节的数据),主要包含时间戳和值。我使用蓝图 api.

对其进行查询

如果我使用 localhost:1337/datatable?limit=100000000000 进行查询,则 nodejs 在 0.12 上挂起且使用了很多 CPU,并在 v4 上崩溃。它在 toJSON 函数上崩溃。

我发现我需要对 API 进行多次查询。但是我不知道如何去做。

如何对 "don't explode" 我的服务器进行多个查询?


更新:

在具有最新水线和 sails-mongo 的较新版本 0.12.3 上,查询变得更加顺畅。云上的崩溃是我没有足够的 RAM 来处理同一个 T2.micro 实例上的 sailsjs 和 mongodb。

我已将 mongodb 服务器移至 M3.Medium 实例。现在服务器不再崩溃,但它冻结了。我正在使用 skip limit,它对 sails.js 很好用,但对 mongodb 是一种资源浪费!

Mongodb 使用 limit = skip + limit 进行内部查询。然后将光标移动到所需的数据和returns。当您在分页中进行大量操作时,您正在使用大量内部查询。随着查询大小的增加。

正如 this article 所解释的那样,解决 MongoDB 中资源浪费的方法是避免使用 skip 并巧妙地使用 _id 作为查询的一部分.

我没有使用 sails mongo 但我确实通过在 nodejs 中使用 mongo 驱动程序实现了上述想法:

/**
 * Motivation:
 * Wanted to put together some code that used:
 *  - BlueBird (promises)
 *  - MongoDB NodeJS Driver
 *  - and paging that did not rely on skip()
 *
 * References:
 * Based on articles such as:
 * https://scalegrid.io/blog/fast-paging-with-mongodb/
 * and GitHub puclic code searches such as:
 * https://github.com/search?utf8=%E2%9C%93&q=bluebird+MongoClient+_id+find+limit+gt+language%3Ajavascript+&type=Code&ref=searchresults
 * which yielded smaple code hits such as:
 * https://github.com/HabitRPG/habitrpg/blob/28f2e9c356d7053884107d90d04e28dde75fa81b/migrations/api_v3/coupons.js#L71
 */

  var Promise = require('bluebird'); // jshint ignore:line
  var _ = require('lodash');
  var MongoClient = require('mongodb').MongoClient;
  var dbHandleForShutDowns;

  // option a: great for debugging
  var logger = require('tracer').console();
  // option b: general purpose use
  //var logger = console;

  //...

    var getPage = function getPage(db, collectionName, query, projection, pageSize, processPage) {
      //console.log('DEBUG', 'filter:', JSON.stringify(query,null,2));
      projection = (projection) ? projection['_id']=true : {'_id':true};
      return db
        .collection(collectionName)
        .find(query)
        .project(projection)
        .sort({'_id':1}).limit(pageSize)
        .toArray() // cursor methods return promises: http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#toArray
        .then(function processPagedResults(documents) {
          if (!documents || documents.length < 1) {
            // stop - no data left to traverse
            return Promise.resolve();
          }
          else {
            if (documents.length < pageSize) {
              // stop - last page
              return processPage(documents);
            }
            else {
              return processPage(documents) // process the results of the current page
                .then(function getNextPage(){ // then go get the next page
                  var last_id = documents[documents.length-1]['_id'];
                  query['_id'] = {'$gt' : last_id};
                  return getPage(db, collectionName, query, projection, pageSize, processPage);
                });
            }
          }
        });
    };

    //...

    return MongoClient
      .connect(params.dbUrl, {
        promiseLibrary: Promise
      })
      .then(function(db) {
        dbHandleForShutDowns = db;
        return getPage(db, collectionName, {}, {}, 5, function processPage(pagedDocs){console.log('do something with', pagedDocs);})
          .finally(db.close.bind(db));
      })
      .catch(function(err) {
        console.error("ERROR", err);
        dbHandleForShutDowns.close();
      });

以下两节显示代码如何操作 _id 并使其成为查询的一部分:

 .sort({'_id':1}).limit(pageSize)
 // [...]
var last_id = documents[documents.length-1]['_id'];
query['_id'] = {'$gt' : last_id};

整体代码流程:

  1. getPage()处理工作,您可以根据自己的喜好设置pageSizequery

    return getPage(db, collectionName, {}, {}, 5, function processPage(pagedDocs){console.log('do something with', pagedDocs);})
    
  2. 方法签名:

    var getPage = function getPage(db, collectionName, query, projection, pageSize, processPage) {
    
  3. 一旦可用就处理pagedResults

    return processPage(documents) // process the results of the current page
    
  4. 移至下一页:

    return getPage(db, collectionName, query, projection, pageSize, processPage);
    
  5. 当没有更多数据剩余时,代码将停止:

    // stop - no data left to traverse
    return Promise.resolve();
    
  6. 否则在处理最后一页数据时会停止:

    // stop - last page
    return processPage(documents);
    

我希望这能提供一些灵感,即使它不是满足您需求的确切解决方案。

1. 运行合计

const SailsMongoQuery = require('sails-mongo/lib/query/index.js')
const SailsMongoMatchMongoId = require('sails-mongo/lib/utils.js').matchMongoId
const fn = model.find(query).paginate(paginate)
const criteria = fn._criteria
const queryLib = new SailsMongoQuery(criteria, {})
const queryOptions = _.omit(queryLib.criteria, 'where')
const where = queryLib.criteria.where || {}
const queryWhere = Object.keys(where).reduce((acc, key) => {
  const val = where[key]
  acc[key] = SailsMongoMatchMongoId(val) ? new ObjectID(val) : val
  return acc
}, {})

const aggregate = [
  { $match: queryWhere }
].concat(Object.keys(queryOptions).map(key => ({ [`$${key}`]: queryOptions[key] })))

// console.log('roge aggregate --->', JSON.stringify(aggregate, null, 2))

model.native((err, collection) => {
  if (err) return callback(err)
  collection.aggregate(aggregate, { allowDiskUse: true }).toArray(function (err, docs) {
    if (err) return callback(err)
    const pk = primaryKey === 'id' ? '_id' : primaryKey
    ids = docs.reduce((acc, doc) => [...acc, doc[pk]], [])
    callback()
  })
})

2。 运行 按 id`s

查找帆
query = Object.assign({}, query, { [primaryKey]: ids }) // check primary key in sails model
fn = model.find(query) // .populate or another method
fn.exec((err, results) => { console.log('result ->>>>', err, results) })