Mongo 使用议程有时会延迟数据库队列处理
Mongo db queue processing is delayed at times using agenda
我们正在使用 mongodb 队列进行一些处理,我们正在使用议程调度程序每 3 分钟 运行 从队列中获取一条消息并进行处理。我们观察到的问题是它没有像预期的那样始终如一地工作,有时消息在队列中保留一段时间(甚至没有确认,意味着被拾取)一旦它开始处理队列中的后续消息就得到处理再次处理得更快,直到再次发生延迟。
如果你看一下这个删除的时间戳,最后三个 t运行sactions 在上面 运行 比它之前的要晚得多,而它应该比第 4 条记录晚 3 到 4 分钟处理.
在下面找到我们用来从队列中获取和处理的代码
module.exports = function (agenda_processing) {
var isStatic = false;
agenda_processing.disableRemoteMethodByName('updateAttributes', isStatic);
// agenda_processing = Object.assign(agenda_processing, httpManager);
isStatic = true;
agenda_processing.disableRemoteMethodByName('updateAll', isStatic);
agenda_processing.disableRemoteMethodByName('deleteById', isStatic);
agenda_processing.disableRemoteMethodByName('create', isStatic);
agenda_processing.disableRemoteMethodByName('upsert', isStatic);
agenda_processing.disableRemoteMethodByName('count', isStatic);
agenda_processing.disableRemoteMethodByName('findOne', isStatic);
agenda_processing.disableRemoteMethodByName('exists', isStatic);
agenda_processing.disableRemoteMethodByName('find', isStatic);
agenda_processing.disableRemoteMethodByName('findById', isStatic);
var jobsManager = ''
async function graceful() {
if (jobsManager) {
await jobsManager.stop();
}
setTimeout(() => {
process.exit(0);
}, 1000)
}
process.on('uncaughtException', graceful);
process.on("SIGTERM", graceful);
process.on("SIGINT", graceful);
//To deploy to dev commented out
setTimeout(() => {
setUpJobForProcessingQueue()
}, 3000)
function setUpJobForProcessingQueue () {
const dbUrl = config.spkmsdb.url
jobsManager = awbjobs.init(dbUrl, async () => {
await defineAndStartJobs()
})
//console.log(jobsManager)
};
async function defineAndStartJobs () {
let connector = agenda_processing.app.models.sites.getDataSource().connector.db
queue = processingqueue.initQueue(connector)
var jobNm = "processing-job"
jobsManager.define(jobNm, async function (job,done) {
try {
winstonLogger.info('Agenda: Entering the define callback')
await getDataFromQueueAndProcess(queue)
done()
winstonLogger.info('Agenda: Called done')
} catch(err) {
done(err)
}
}.bind(jobNm))
await jobsManager.every("180 seconds", jobNm) //3 minutes
await jobsManager.start()
winstonLogger.info('awb jobs have been set up')
}
}
议程如下:
const agenda = new Agenda({ db: { address: dbConStr ,options: { useUnifiedTopology: true, useNewUrlParser: true }}});
agenda.on('ready', onReady)
队列内部化为
queuename = 'processing-queue'
const queue = mongoDbQueue(db, queuename, {maxRetries:1, visibility:3600});
解决这种一致性问题的任何帮助都将大有帮助。提前致谢。
问题出在议程初始化程序上,作业从未在 mongo 数据库中创建,因为作业的收集在初始化议程时没有暂停,这导致调度程序从队列中拾取的行为异常不均匀,尤其是当我们有多个应用程序实例使用其自己的调度程序作业时(集合名称将根据实例动态变化)。
const agenda = new Agenda({ db: { address: dbConStr, options: { useUnifiedTopology: true, useNewUrlParser: true }, collection: 'jobscollection' } });
我们正在使用 mongodb 队列进行一些处理,我们正在使用议程调度程序每 3 分钟 运行 从队列中获取一条消息并进行处理。我们观察到的问题是它没有像预期的那样始终如一地工作,有时消息在队列中保留一段时间(甚至没有确认,意味着被拾取)一旦它开始处理队列中的后续消息就得到处理再次处理得更快,直到再次发生延迟。
如果你看一下这个删除的时间戳,最后三个 t运行sactions 在上面 运行 比它之前的要晚得多,而它应该比第 4 条记录晚 3 到 4 分钟处理.
在下面找到我们用来从队列中获取和处理的代码
module.exports = function (agenda_processing) {
var isStatic = false;
agenda_processing.disableRemoteMethodByName('updateAttributes', isStatic);
// agenda_processing = Object.assign(agenda_processing, httpManager);
isStatic = true;
agenda_processing.disableRemoteMethodByName('updateAll', isStatic);
agenda_processing.disableRemoteMethodByName('deleteById', isStatic);
agenda_processing.disableRemoteMethodByName('create', isStatic);
agenda_processing.disableRemoteMethodByName('upsert', isStatic);
agenda_processing.disableRemoteMethodByName('count', isStatic);
agenda_processing.disableRemoteMethodByName('findOne', isStatic);
agenda_processing.disableRemoteMethodByName('exists', isStatic);
agenda_processing.disableRemoteMethodByName('find', isStatic);
agenda_processing.disableRemoteMethodByName('findById', isStatic);
var jobsManager = ''
async function graceful() {
if (jobsManager) {
await jobsManager.stop();
}
setTimeout(() => {
process.exit(0);
}, 1000)
}
process.on('uncaughtException', graceful);
process.on("SIGTERM", graceful);
process.on("SIGINT", graceful);
//To deploy to dev commented out
setTimeout(() => {
setUpJobForProcessingQueue()
}, 3000)
function setUpJobForProcessingQueue () {
const dbUrl = config.spkmsdb.url
jobsManager = awbjobs.init(dbUrl, async () => {
await defineAndStartJobs()
})
//console.log(jobsManager)
};
async function defineAndStartJobs () {
let connector = agenda_processing.app.models.sites.getDataSource().connector.db
queue = processingqueue.initQueue(connector)
var jobNm = "processing-job"
jobsManager.define(jobNm, async function (job,done) {
try {
winstonLogger.info('Agenda: Entering the define callback')
await getDataFromQueueAndProcess(queue)
done()
winstonLogger.info('Agenda: Called done')
} catch(err) {
done(err)
}
}.bind(jobNm))
await jobsManager.every("180 seconds", jobNm) //3 minutes
await jobsManager.start()
winstonLogger.info('awb jobs have been set up')
}
}
议程如下:
const agenda = new Agenda({ db: { address: dbConStr ,options: { useUnifiedTopology: true, useNewUrlParser: true }}});
agenda.on('ready', onReady)
队列内部化为
queuename = 'processing-queue'
const queue = mongoDbQueue(db, queuename, {maxRetries:1, visibility:3600});
解决这种一致性问题的任何帮助都将大有帮助。提前致谢。
问题出在议程初始化程序上,作业从未在 mongo 数据库中创建,因为作业的收集在初始化议程时没有暂停,这导致调度程序从队列中拾取的行为异常不均匀,尤其是当我们有多个应用程序实例使用其自己的调度程序作业时(集合名称将根据实例动态变化)。
const agenda = new Agenda({ db: { address: dbConStr, options: { useUnifiedTopology: true, useNewUrlParser: true }, collection: 'jobscollection' } });