如何在 express 服务器中 运行 时优雅地关闭 bullmq?
How to gracefully shutdown bullmq when running inside an express server?
- 我有一个使用 bullmq 队列、调度程序和工作程序的快速应用程序。即使在按下 Ctrl + C 后,我仍然可以在我的 Activity 管理器中看到节点进程 运行,但我在终端上的服务器已关闭。我知道这是因为 bullmq 任务开始输出 console.log 语句,即使在服务器关闭到终端后也是如此。
这是我的 server.js 文件的样子
// eslint-disable-next-line import/first
import http from 'http';
import { app } from './app';
import { sessionParser } from './session';
import { websocketServer } from './ws';
import 'jobs/repeatable';
const server = http.createServer(app);
server.on('upgrade', (request, socket, head) => {
sessionParser(request, {}, () => {
websocketServer.handleUpgrade(request, socket, head, (ws) => {
websocketServer.emit('connection', ws, request);
});
});
});
server.on('listening', () => {
websocketServer.emit('listening');
});
server.on('close', () => {
websocketServer.emit('close');
});
//
process.on('SIGINT', () => {
server.close();
});
export { server };
请注意,我在上面定义了一个 SIGINT 处理程序。这是我的工作没有退出的原因吗?我是否必须手动关闭 SIGINT 中的每个队列、工作程序和调度程序?我的 jobs/repeatable.js 文件如下所示
const { scheduleJobs } = require('jobs');
if (process.env.ENABLE_JOB_QUEUE === 'true') {
scheduleJobs();
}
这是我的 jobs.js 文件
import { scheduleDeleteExpiredTokensJob } from './delete-expired-tokens';
import { scheduleDeleteNullVotesJob } from './delete-null-votes';
export async function scheduleJobs() {
await scheduleDeleteExpiredTokensJob();
await scheduleDeleteNullVotesJob();
}
这是我的delete-expired-tokens.js文件,另一个很相似
import { processor as deleteExpiredTokensProcessor } from './processor';
import { queue as deleteExpiredTokensQueue } from './queue';
import { scheduler as deleteExpiredTokensScheduler } from './scheduler';
import { worker as deleteExpiredTokensWorker } from './worker';
export async function scheduleDeleteExpiredTokensJob() {
const jobId = process.env.QUEUE_DELETE_EXPIRED_TOKENS_JOB_ID;
const jobName = process.env.QUEUE_DELETE_EXPIRED_TOKENS_JOB_NAME;
await deleteExpiredTokensQueue.add(jobName, null, {
repeat: {
cron: process.env.QUEUE_DELETE_EXPIRED_TOKENS_FREQUENCY,
jobId,
},
});
}
export {
deleteExpiredTokensProcessor,
deleteExpiredTokensQueue,
deleteExpiredTokensScheduler,
deleteExpiredTokensWorker,
};
如何优雅地关闭 bullmq 任务队列?
您必须对工人调用 close()
方法:
server.on('close', async () => {
websocketServer.emit('close');
// Close the workers
await worker.close()
});
- 我有一个使用 bullmq 队列、调度程序和工作程序的快速应用程序。即使在按下 Ctrl + C 后,我仍然可以在我的 Activity 管理器中看到节点进程 运行,但我在终端上的服务器已关闭。我知道这是因为 bullmq 任务开始输出 console.log 语句,即使在服务器关闭到终端后也是如此。
这是我的 server.js 文件的样子
// eslint-disable-next-line import/first
import http from 'http';
import { app } from './app';
import { sessionParser } from './session';
import { websocketServer } from './ws';
import 'jobs/repeatable';
const server = http.createServer(app);
server.on('upgrade', (request, socket, head) => {
sessionParser(request, {}, () => {
websocketServer.handleUpgrade(request, socket, head, (ws) => {
websocketServer.emit('connection', ws, request);
});
});
});
server.on('listening', () => {
websocketServer.emit('listening');
});
server.on('close', () => {
websocketServer.emit('close');
});
//
process.on('SIGINT', () => {
server.close();
});
export { server };
请注意,我在上面定义了一个 SIGINT 处理程序。这是我的工作没有退出的原因吗?我是否必须手动关闭 SIGINT 中的每个队列、工作程序和调度程序?我的 jobs/repeatable.js 文件如下所示
const { scheduleJobs } = require('jobs');
if (process.env.ENABLE_JOB_QUEUE === 'true') {
scheduleJobs();
}
这是我的 jobs.js 文件
import { scheduleDeleteExpiredTokensJob } from './delete-expired-tokens';
import { scheduleDeleteNullVotesJob } from './delete-null-votes';
export async function scheduleJobs() {
await scheduleDeleteExpiredTokensJob();
await scheduleDeleteNullVotesJob();
}
这是我的delete-expired-tokens.js文件,另一个很相似
import { processor as deleteExpiredTokensProcessor } from './processor';
import { queue as deleteExpiredTokensQueue } from './queue';
import { scheduler as deleteExpiredTokensScheduler } from './scheduler';
import { worker as deleteExpiredTokensWorker } from './worker';
export async function scheduleDeleteExpiredTokensJob() {
const jobId = process.env.QUEUE_DELETE_EXPIRED_TOKENS_JOB_ID;
const jobName = process.env.QUEUE_DELETE_EXPIRED_TOKENS_JOB_NAME;
await deleteExpiredTokensQueue.add(jobName, null, {
repeat: {
cron: process.env.QUEUE_DELETE_EXPIRED_TOKENS_FREQUENCY,
jobId,
},
});
}
export {
deleteExpiredTokensProcessor,
deleteExpiredTokensQueue,
deleteExpiredTokensScheduler,
deleteExpiredTokensWorker,
};
如何优雅地关闭 bullmq 任务队列?
您必须对工人调用 close()
方法:
server.on('close', async () => {
websocketServer.emit('close');
// Close the workers
await worker.close()
});