Meteor Mongo 集合查找 forEach 游标迭代并保存到 ElasticSearch 问题
Meteor Mongo Collections find forEach cursor iteration and saving to ElasticSearch Problem
我有连接到 MongoDB 的 Meteor 应用程序。
在 mongo 中,我有一个 table,它有大约 70 万条记录。
我每周都有一个 cron 作业,我从 table 中读取所有记录(使用 Mongo 游标)并以 10k 为一批,我想将它们插入 Elastic Search 以便它们被索引。
let articles = []
Collections.Articles.find({}).forEach(function(doc) {
articles.push({
index: {_index: 'main', _type: 'article', _id: doc.id }
},
doc);
if (0 === articles.length % 10000) {
client.bulk({ maxRetries: 5, index: 'main', type: 'article', body: articles })
data = []
}
})
由于 for each 是同步的,在继续之前遍历每条记录,并且 client.bulk 是异步的,这会使弹性搜索服务器过载,并因内存不足异常而崩溃。
有没有办法在插入完成期间暂停 forEach?我尝试了 async/await 但这似乎并不奏效。
let articles = []
Collections.Articles.find({}).forEach(async function(doc) {
articles.push({
index: {_index: 'main', _type: 'article', _id: doc.id }
},
doc);
if (0 === articles.length % 10000) {
await client.bulk({ maxRetries: 5, index: 'main', type: 'article', body: articles })
data = []
}
})
有什么方法可以实现吗?
编辑:我正在努力实现这样的目标——如果我使用 promises
let articles = []
Collections.Articles.find({}).forEach(function(doc) {
articles.push({
index: {_index: 'main', _type: 'article', _id: doc.id }
},
doc);
if (0 === articles.length % 10000) {
// Pause FETCHING rows with forEach
client.bulk({ maxRetries: 5, index: 'main', type: 'article', body: articles }).then(() => {
console.log('inserted')
// RESUME FETCHING rows with forEach
console.log("RESUME READING");
})
data = []
}
})
如果您担心不受限制的迭代,则可以使用内部 Meteor._sleepForMs
方法,该方法允许您在 sync-styled 代码中设置异步超时:
Collections.Articles.find().forEach((doc, index) => {
console.log(index, doc._id)
Meteor._sleepForMs(timeout)
})
现在这在 Meteor 环境中工作正常(Meteor.startup
、Meteor.methods
、Meteor.publish
)。
你的 cron 很可能不在这个环境中(= Fiber)所以你可以写一个绑定环境的包装器:
const bound = fct => Meteor.bindEnvironment(fct)
const iterateSlow = bound(function (timeout) {
Collections.Articles.find().forEach((doc, index) => {
console.log(index, doc._id)
Meteor._sleepForMs(timeout)
})
return true
})
iterateSlow(50) // iterates with 50ms timeout
这是一个完整的最小示例,您可以使用新项目重现:
// create a minimal collection
const MyDocs = new Mongo.Collection('myDocs')
// fill the collection
Meteor.startup(() => {
for (let i = 0; i < 100; i++) {
MyDocs.insert({})
}
})
// bind helper
const bound = fct => Meteor.bindEnvironment(fct)
// iterate docs with interval between
const iterateSlow = bound(function (timeout) {
MyDocs.find().forEach((doc, index) => {
console.log(index, doc._id)
Meteor._sleepForMs(timeout)
})
return true
})
// simulate external environment, like when cron runs
setTimeout(() => {
iterateSlow(50)
}, 2000)
设法让它与 ES2018 异步迭代一起工作
从中得到一个想法
这是有效的代码
let articles = []
let cursor = Collections.Articles.find({})
for await (doc of cursor) {
articles.push({
index: {_index: 'main', _type: 'article', _id: doc.id }
},
doc);
if (articles.length === 10000) {
await client.bulk({ maxRetries: 5, index: 'trusted', type: 'artikel', body: articles })
articles = []
}
}
这工作正常,它设法将所有记录插入 Elastic Search 而不会崩溃。
我有连接到 MongoDB 的 Meteor 应用程序。
在 mongo 中,我有一个 table,它有大约 70 万条记录。
我每周都有一个 cron 作业,我从 table 中读取所有记录(使用 Mongo 游标)并以 10k 为一批,我想将它们插入 Elastic Search 以便它们被索引。
let articles = []
Collections.Articles.find({}).forEach(function(doc) {
articles.push({
index: {_index: 'main', _type: 'article', _id: doc.id }
},
doc);
if (0 === articles.length % 10000) {
client.bulk({ maxRetries: 5, index: 'main', type: 'article', body: articles })
data = []
}
})
由于 for each 是同步的,在继续之前遍历每条记录,并且 client.bulk 是异步的,这会使弹性搜索服务器过载,并因内存不足异常而崩溃。
有没有办法在插入完成期间暂停 forEach?我尝试了 async/await 但这似乎并不奏效。
let articles = []
Collections.Articles.find({}).forEach(async function(doc) {
articles.push({
index: {_index: 'main', _type: 'article', _id: doc.id }
},
doc);
if (0 === articles.length % 10000) {
await client.bulk({ maxRetries: 5, index: 'main', type: 'article', body: articles })
data = []
}
})
有什么方法可以实现吗?
编辑:我正在努力实现这样的目标——如果我使用 promises
let articles = []
Collections.Articles.find({}).forEach(function(doc) {
articles.push({
index: {_index: 'main', _type: 'article', _id: doc.id }
},
doc);
if (0 === articles.length % 10000) {
// Pause FETCHING rows with forEach
client.bulk({ maxRetries: 5, index: 'main', type: 'article', body: articles }).then(() => {
console.log('inserted')
// RESUME FETCHING rows with forEach
console.log("RESUME READING");
})
data = []
}
})
如果您担心不受限制的迭代,则可以使用内部 Meteor._sleepForMs
方法,该方法允许您在 sync-styled 代码中设置异步超时:
Collections.Articles.find().forEach((doc, index) => {
console.log(index, doc._id)
Meteor._sleepForMs(timeout)
})
现在这在 Meteor 环境中工作正常(Meteor.startup
、Meteor.methods
、Meteor.publish
)。
你的 cron 很可能不在这个环境中(= Fiber)所以你可以写一个绑定环境的包装器:
const bound = fct => Meteor.bindEnvironment(fct)
const iterateSlow = bound(function (timeout) {
Collections.Articles.find().forEach((doc, index) => {
console.log(index, doc._id)
Meteor._sleepForMs(timeout)
})
return true
})
iterateSlow(50) // iterates with 50ms timeout
这是一个完整的最小示例,您可以使用新项目重现:
// create a minimal collection
const MyDocs = new Mongo.Collection('myDocs')
// fill the collection
Meteor.startup(() => {
for (let i = 0; i < 100; i++) {
MyDocs.insert({})
}
})
// bind helper
const bound = fct => Meteor.bindEnvironment(fct)
// iterate docs with interval between
const iterateSlow = bound(function (timeout) {
MyDocs.find().forEach((doc, index) => {
console.log(index, doc._id)
Meteor._sleepForMs(timeout)
})
return true
})
// simulate external environment, like when cron runs
setTimeout(() => {
iterateSlow(50)
}, 2000)
设法让它与 ES2018 异步迭代一起工作
从中得到一个想法
这是有效的代码
let articles = []
let cursor = Collections.Articles.find({})
for await (doc of cursor) {
articles.push({
index: {_index: 'main', _type: 'article', _id: doc.id }
},
doc);
if (articles.length === 10000) {
await client.bulk({ maxRetries: 5, index: 'trusted', type: 'artikel', body: articles })
articles = []
}
}
这工作正常,它设法将所有记录插入 Elastic Search 而不会崩溃。