Meteor Mongo 集合查找 forEach 游标迭代并保存到 ElasticSearch 问题

Meteor Mongo Collections find forEach cursor iteration and saving to ElasticSearch Problem

我有连接到 MongoDB 的 Meteor 应用程序。
在 mongo 中,我有一个 table,它有大约 70 万条记录。 我每周都有一个 cron 作业,我从 table 中读取所有记录(使用 Mongo 游标)并以 10k 为一批,我想将它们插入 Elastic Search 以便它们被索引。

let articles = []
Collections.Articles.find({}).forEach(function(doc) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (0 === articles.length % 10000) {
          client.bulk({   maxRetries: 5,  index: 'main',   type: 'article',    body: articles   })
          data = []
        }
    })

由于 for each 是同步的,在继续之前遍历每条记录,并且 client.bulk 是异步的,这会使弹性搜索服务器过载,并因内存不足异常而崩溃。
有没有办法在插入完成期间暂停 forEach?我尝试了 async/await 但这似乎并不奏效。

let articles = []
Collections.Articles.find({}).forEach(async function(doc) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (0 === articles.length % 10000) {
          await client.bulk({   maxRetries: 5,  index: 'main',   type: 'article',    body: articles   })
          data = []
        }
    })

有什么方法可以实现吗?

编辑:我正在努力实现这样的目标——如果我使用 promises

let articles = []
Collections.Articles.find({}).forEach(function(doc) {
        articles.push({ 
            index: {_index: 'main', _type: 'article', _id: doc.id }
        },
        doc);
        if (0 === articles.length % 10000) {
        // Pause FETCHING rows with forEach
            client.bulk({   maxRetries: 5,  index: 'main',   type: 'article',    body: articles    }).then(() => {
                                          console.log('inserted')
                                          // RESUME FETCHING rows with forEach
                                          console.log("RESUME READING");  
            })
          data = []
        }
    })

如果您担心不受限制的迭代,则可以使用内部 Meteor._sleepForMs 方法,该方法允许您在 sync-styled 代码中设置异步超时:

Collections.Articles.find().forEach((doc, index) => {
  console.log(index, doc._id)
  Meteor._sleepForMs(timeout)
})

现在这在 Meteor 环境中工作正常(Meteor.startupMeteor.methodsMeteor.publish)。

你的 cron 很可能不在这个环境中(= Fiber)所以你可以写一个绑定环境的包装器:

const bound = fct => Meteor.bindEnvironment(fct)

const iterateSlow = bound(function (timeout) {
  Collections.Articles.find().forEach((doc, index) => {
    console.log(index, doc._id)
    Meteor._sleepForMs(timeout)
  })
  return true
})

iterateSlow(50) // iterates with 50ms timeout

这是一个完整的最小示例,您可以使用新项目重现:

// create a minimal collection
const MyDocs = new Mongo.Collection('myDocs')

// fill the collection
Meteor.startup(() => {
  for (let i = 0; i < 100; i++) {
    MyDocs.insert({})
  }
})

// bind helper
const bound = fct => Meteor.bindEnvironment(fct)

// iterate docs with interval between
const iterateSlow = bound(function (timeout) {
  MyDocs.find().forEach((doc, index) => {
    console.log(index, doc._id)
    Meteor._sleepForMs(timeout)
  })
  return true
})

// simulate external environment, like when cron runs
setTimeout(() => {
  iterateSlow(50)
}, 2000)

设法让它与 ES2018 异步迭代一起工作
从中得到一个想法
这是有效的代码

let articles = []
let cursor = Collections.Articles.find({})

for await (doc of cursor) {
    articles.push({ 
        index: {_index: 'main', _type: 'article', _id: doc.id }
    },
    doc);
    if (articles.length === 10000) {
        await client.bulk({   maxRetries: 5,  index: 'trusted',   type: 'artikel',    body: articles   })
        articles = []
    }
}

这工作正常,它设法将所有记录插入 Elastic Search 而不会崩溃。