Mongo 使用议程有时会延迟数据库队列处理

Mongo db queue processing is delayed at times using agenda

我们正在使用 mongodb 队列进行一些处理,我们正在使用议程调度程序每 3 分钟 运行 从队列中获取一条消息并进行处理。我们观察到的问题是它没有像预期的那样始终如一地工作,有时消息在队列中保留一段时间(甚至没有确认,意味着被拾取)一旦它开始处理队列中的后续消息就得到处理再次处理得更快,直到再次发生延迟。

如果你看一下这个删除的时间戳,最后三个 t运行sactions 在上面 运行 比它之前的要晚得多,而它应该比第 4 条记录晚 3 到 4 分钟处理.

在下面找到我们用来从队列中获取和处理的代码

module.exports = function (agenda_processing) {

    var isStatic = false;
    agenda_processing.disableRemoteMethodByName('updateAttributes', isStatic);
    // agenda_processing = Object.assign(agenda_processing, httpManager);
    isStatic = true;
    agenda_processing.disableRemoteMethodByName('updateAll', isStatic);
    agenda_processing.disableRemoteMethodByName('deleteById', isStatic);
    agenda_processing.disableRemoteMethodByName('create', isStatic);
    agenda_processing.disableRemoteMethodByName('upsert', isStatic);
    agenda_processing.disableRemoteMethodByName('count', isStatic);
    agenda_processing.disableRemoteMethodByName('findOne', isStatic);
    agenda_processing.disableRemoteMethodByName('exists', isStatic);
    agenda_processing.disableRemoteMethodByName('find', isStatic);
    agenda_processing.disableRemoteMethodByName('findById', isStatic);
    
    var jobsManager = ''
    
    async function graceful() {
        if (jobsManager) {
          await  jobsManager.stop();
        }
        setTimeout(() => {
            process.exit(0);
        }, 1000)
    }
        process.on('uncaughtException', graceful);
        process.on("SIGTERM", graceful);
        process.on("SIGINT", graceful);
    
    //To deploy to dev commented out
    setTimeout(() => {

        setUpJobForProcessingQueue()
    }, 3000)

    function setUpJobForProcessingQueue () {
        const dbUrl = config.spkmsdb.url
        jobsManager = awbjobs.init(dbUrl, async () => {
            await defineAndStartJobs()
        })
        //console.log(jobsManager)
    };

    async function defineAndStartJobs () {
        let connector = agenda_processing.app.models.sites.getDataSource().connector.db
        queue = processingqueue.initQueue(connector)
        var jobNm = "processing-job"
        jobsManager.define(jobNm, async function (job,done) {   
            try {
                winstonLogger.info('Agenda: Entering the define callback')
                await getDataFromQueueAndProcess(queue)
                done()
                winstonLogger.info('Agenda: Called done')
            } catch(err) {
                done(err)
            }
        }.bind(jobNm))
        await jobsManager.every("180 seconds", jobNm) //3 minutes
        await jobsManager.start()
        winstonLogger.info('awb jobs have been set up')
    }
}

议程如下:

const agenda = new Agenda({ db: { address: dbConStr ,options: { useUnifiedTopology: true, useNewUrlParser: true }}});
          agenda.on('ready', onReady)

队列内部化为

queuename =  'processing-queue'
const queue = mongoDbQueue(db, queuename, {maxRetries:1, visibility:3600});

解决这种一致性问题的任何帮助都将大有帮助。提前致谢。

问题出在议程初始化程序上,作业从未在 mongo 数据库中创建,因为作业的收集在初始化议程时没有暂停,这导致调度程序从队列中拾取的行为异常不均匀,尤其是当我们有多个应用程序实例使用其自己的调度程序作业时(集合名称将根据实例动态变化)。

const agenda = new Agenda({ db: { address: dbConStr, options: { useUnifiedTopology: true, useNewUrlParser: true }, collection: 'jobscollection' } });